<PackageReference Include="System.Reactive" Version="7.0.0-preview.1" />

ReplaySubject<T>

public sealed class ReplaySubject<T> : SubjectBase<T>
using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Subjects { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] public sealed class ReplaySubject<[System.Runtime.CompilerServices.Nullable(2)] T> : SubjectBase<T> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] private abstract class ReplayBase : SubjectBase<T> { [System.Runtime.CompilerServices.NullableContext(0)] private sealed class Subscription : IDisposable { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] private ReplayBase _subject; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 1 })] private System.Reactive.IScheduledObserver<T> _observer; [System.Runtime.CompilerServices.NullableContext(1)] public Subscription([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] ReplayBase subject, System.Reactive.IScheduledObserver<T> observer) { _subject = subject; _observer = observer; } public void Dispose() { System.Reactive.IScheduledObserver<T> scheduledObserver = Interlocked.Exchange<System.Reactive.IScheduledObserver<T>>(ref _observer, (System.Reactive.IScheduledObserver<T>)null); if (scheduledObserver != null) { scheduledObserver.Dispose(); _subject.Unsubscribe(scheduledObserver); _subject = null; } } } private readonly object _gate = new object(); private System.Reactive.ImmutableList<System.Reactive.IScheduledObserver<T>> _observers; private bool _isStopped; [System.Runtime.CompilerServices.Nullable(2)] private Exception _error; private bool _isDisposed; public override bool HasObservers { get { System.Reactive.ImmutableList<System.Reactive.IScheduledObserver<T>> observers = _observers; if (observers == null) return false; return observers.Data.Length != 0; } } public override bool IsDisposed { get { lock (_gate) { return _isDisposed; } } } protected ReplayBase() { _observers = System.Reactive.ImmutableList<System.Reactive.IScheduledObserver<T>>.Empty; _isStopped = false; _error = null; } public override void OnNext(T value) { System.Reactive.IScheduledObserver<T>[] array = null; lock (_gate) { CheckDisposed(); if (!_isStopped) { Next(value); Trim(); array = _observers.Data; System.Reactive.IScheduledObserver<T>[] array2 = array; for (int i = 0; i < array2.Length; i++) { array2[i].OnNext(value); } } } if (array != null) { System.Reactive.IScheduledObserver<T>[] array2 = array; for (int i = 0; i < array2.Length; i++) { array2[i].EnsureActive(); } } } public override void OnError(Exception error) { System.Reactive.IScheduledObserver<T>[] array = null; lock (_gate) { CheckDisposed(); if (!_isStopped) { _isStopped = true; _error = error; Trim(); array = _observers.Data; System.Reactive.IScheduledObserver<T>[] array2 = array; for (int i = 0; i < array2.Length; i++) { array2[i].OnError(error); } _observers = System.Reactive.ImmutableList<System.Reactive.IScheduledObserver<T>>.Empty; } } if (array != null) { System.Reactive.IScheduledObserver<T>[] array2 = array; for (int i = 0; i < array2.Length; i++) { array2[i].EnsureActive(); } } } public override void OnCompleted() { System.Reactive.IScheduledObserver<T>[] array = null; lock (_gate) { CheckDisposed(); if (!_isStopped) { _isStopped = true; Trim(); array = _observers.Data; System.Reactive.IScheduledObserver<T>[] array2 = array; for (int i = 0; i < array2.Length; i++) { array2[i].OnCompleted(); } _observers = System.Reactive.ImmutableList<System.Reactive.IScheduledObserver<T>>.Empty; } } if (array != null) { System.Reactive.IScheduledObserver<T>[] array2 = array; for (int i = 0; i < array2.Length; i++) { array2[i].EnsureActive(); } } } public override IDisposable Subscribe(IObserver<T> observer) { System.Reactive.IScheduledObserver<T> scheduledObserver = CreateScheduledObserver(observer); int num = 0; IDisposable result = Disposable.Empty; lock (_gate) { CheckDisposed(); Trim(); num = Replay(scheduledObserver); if (_error != null) { num++; scheduledObserver.OnError(_error); } else if (_isStopped) { num++; scheduledObserver.OnCompleted(); } if (!_isStopped) { result = new Subscription(this, scheduledObserver); _observers = _observers.Add(scheduledObserver); } } scheduledObserver.EnsureActive(num); return result; } public override void Dispose() { lock (_gate) { _isDisposed = true; _observers = null; DisposeCore(); } } protected abstract void DisposeCore(); protected abstract void Next(T value); protected abstract int Replay(IObserver<T> observer); protected abstract void Trim(); protected abstract System.Reactive.IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer); private void CheckDisposed() { if (_isDisposed) throw new ObjectDisposedException(string.Empty); } private void Unsubscribe(System.Reactive.IScheduledObserver<T> observer) { lock (_gate) { if (!_isDisposed) _observers = _observers.Remove(observer); } } } [System.Runtime.CompilerServices.Nullable(0)] private sealed class ReplayByTime : ReplayBase { private const int InfiniteBufferSize = int.MaxValue; private readonly int _bufferSize; private readonly TimeSpan _window; private readonly IScheduler _scheduler; private readonly IStopwatch _stopwatch; [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 1 })] private readonly Queue<TimeInterval<T>> _queue; public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler) { if (bufferSize < 0) throw new ArgumentOutOfRangeException("bufferSize"); if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException("window"); _bufferSize = bufferSize; _window = window; if (scheduler == null) throw new ArgumentNullException("scheduler"); _scheduler = scheduler; _stopwatch = _scheduler.StartStopwatch(); _queue = new Queue<TimeInterval<T>>(); } public ReplayByTime(int bufferSize, TimeSpan window) : this(bufferSize, window, System.Reactive.Concurrency.SchedulerDefaults.Iteration) { } public ReplayByTime(IScheduler scheduler) : this(2147483647, TimeSpan.MaxValue, scheduler) { } public ReplayByTime(int bufferSize, IScheduler scheduler) : this(bufferSize, TimeSpan.MaxValue, scheduler) { } public ReplayByTime(TimeSpan window, IScheduler scheduler) : this(2147483647, window, scheduler) { } public ReplayByTime(TimeSpan window) : this(2147483647, window, System.Reactive.Concurrency.SchedulerDefaults.Iteration) { } protected override System.Reactive.IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer) { return new System.Reactive.ScheduledObserver<T>(_scheduler, observer); } protected override void DisposeCore() { _queue.Clear(); } protected override void Next(T value) { TimeSpan elapsed = _stopwatch.Elapsed; _queue.Enqueue(new TimeInterval<T>(value, elapsed)); } protected override int Replay(IObserver<T> observer) { int count = _queue.Count; foreach (TimeInterval<T> item in _queue) { observer.OnNext(item.Value); } return count; } protected override void Trim() { TimeSpan elapsed = _stopwatch.Elapsed; while (_queue.Count > _bufferSize) { _queue.Dequeue(); } while (_queue.Count > 0 && elapsed.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0) { _queue.Dequeue(); } } } [System.Runtime.CompilerServices.Nullable(0)] private sealed class ReplayOne : ReplayBufferBase { private bool _hasValue; [System.Runtime.CompilerServices.Nullable(2)] private T _value; protected override void Trim() { } protected override void Next(T value) { _hasValue = true; _value = value; } protected override int Replay(IObserver<T> observer) { int result = 0; if (_hasValue) { result = 1; observer.OnNext(_value); } return result; } protected override void DisposeCore() { _value = default(T); } } [System.Runtime.CompilerServices.NullableContext(0)] private sealed class ReplayMany : ReplayManyBase { private readonly int _bufferSize; public ReplayMany(int bufferSize) : base(bufferSize) { _bufferSize = bufferSize; } protected override void Trim() { while (_queue.Count > _bufferSize) { _queue.Dequeue(); } } } [System.Runtime.CompilerServices.NullableContext(0)] private sealed class ReplayAll : ReplayManyBase { public ReplayAll() : base(0) { } protected override void Trim() { } } [System.Runtime.CompilerServices.NullableContext(0)] private abstract class ReplayBufferBase : ReplayBase { [System.Runtime.CompilerServices.NullableContext(1)] protected override System.Reactive.IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer) { return new System.Reactive.Subjects.FastImmediateObserver<T>(observer); } protected override void DisposeCore() { } } [System.Runtime.CompilerServices.Nullable(0)] private abstract class ReplayManyBase : ReplayBufferBase { protected readonly Queue<T> _queue; protected ReplayManyBase(int queueSize) { _queue = new Queue<T>(Math.Min(queueSize, 64)); } protected override void Next(T value) { _queue.Enqueue(value); } protected override int Replay(IObserver<T> observer) { int count = _queue.Count; foreach (T item in _queue) { observer.OnNext(item); } return count; } protected override void DisposeCore() { _queue.Clear(); } } private readonly SubjectBase<T> _implementation; public override bool HasObservers => _implementation.HasObservers; public override bool IsDisposed => _implementation.IsDisposed; public ReplaySubject() : this(2147483647) { } public ReplaySubject(IScheduler scheduler) { _implementation = new ReplayByTime(scheduler); } public ReplaySubject(int bufferSize) { SubjectBase<T> implementation; switch (bufferSize) { case 1: implementation = new ReplayOne(); break; case int.MaxValue: implementation = new ReplayAll(); break; default: implementation = new ReplayMany(bufferSize); break; } _implementation = implementation; } public ReplaySubject(int bufferSize, IScheduler scheduler) { _implementation = new ReplayByTime(bufferSize, scheduler); } public ReplaySubject(TimeSpan window) { _implementation = new ReplayByTime(window); } public ReplaySubject(TimeSpan window, IScheduler scheduler) { _implementation = new ReplayByTime(window, scheduler); } public ReplaySubject(int bufferSize, TimeSpan window) { _implementation = new ReplayByTime(bufferSize, window); } public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler) { _implementation = new ReplayByTime(bufferSize, window, scheduler); } public override void OnNext(T value) { _implementation.OnNext(value); } public override void OnError(Exception error) { if (error == null) throw new ArgumentNullException("error"); _implementation.OnError(error); } public override void OnCompleted() { _implementation.OnCompleted(); } public override IDisposable Subscribe(IObserver<T> observer) { if (observer == null) throw new ArgumentNullException("observer"); return _implementation.Subscribe(observer); } public override void Dispose() { _implementation.Dispose(); } } }