TakeLast<TSource>
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class TakeLast<TSource>
{
internal sealed class Count : Producer<TSource, Count._>
{
internal sealed class _ : Sink<TSource>, IObserver<TSource>
{
private readonly Count _parent;
private Queue<TSource> _queue;
private SingleAssignmentDisposable _subscription;
private SingleAssignmentDisposable _loop;
public _(Count parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
_queue = new Queue<TSource>();
}
public IDisposable Run()
{
_subscription = new SingleAssignmentDisposable();
_loop = new SingleAssignmentDisposable();
_subscription.Disposable = ObservableExtensions.SubscribeSafe<TSource>(_parent._source, (IObserver<TSource>)this);
return StableCompositeDisposable.Create(_subscription, _loop);
}
public void OnNext(TSource value)
{
_queue.Enqueue(value);
if (_queue.Count > _parent._count)
_queue.Dequeue();
}
public void OnError(Exception error)
{
_observer.OnError(error);
base.Dispose();
}
public void OnCompleted()
{
_subscription.Dispose();
ISchedulerLongRunning schedulerLongRunning = _parent._loopScheduler.AsLongRunning();
if (schedulerLongRunning != null)
_loop.Disposable = schedulerLongRunning.ScheduleLongRunning(Loop);
else
_loop.Disposable = _parent._loopScheduler.Schedule(LoopRec);
}
private void LoopRec(Action recurse)
{
if (_queue.Count > 0) {
_observer.OnNext(_queue.Dequeue());
recurse();
} else {
_observer.OnCompleted();
base.Dispose();
}
}
private void Loop(ICancelable cancel)
{
int num = _queue.Count;
while (!cancel.IsDisposed) {
if (num == 0) {
_observer.OnCompleted();
break;
}
_observer.OnNext(_queue.Dequeue());
num--;
}
base.Dispose();
}
}
private readonly IObservable<TSource> _source;
private readonly int _count;
private readonly IScheduler _loopScheduler;
public Count(IObservable<TSource> source, int count, IScheduler loopScheduler)
{
_source = source;
_count = count;
_loopScheduler = loopScheduler;
}
protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run();
}
}
internal sealed class Time : Producer<TSource, Time._>
{
internal sealed class _ : Sink<TSource>, IObserver<TSource>
{
private readonly Time _parent;
private Queue<System.Reactive.TimeInterval<TSource>> _queue;
private SingleAssignmentDisposable _subscription;
private SingleAssignmentDisposable _loop;
private IStopwatch _watch;
public _(Time parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
_queue = new Queue<System.Reactive.TimeInterval<TSource>>();
}
public IDisposable Run()
{
_subscription = new SingleAssignmentDisposable();
_loop = new SingleAssignmentDisposable();
_watch = _parent._scheduler.StartStopwatch();
_subscription.Disposable = ObservableExtensions.SubscribeSafe<TSource>(_parent._source, (IObserver<TSource>)this);
return StableCompositeDisposable.Create(_subscription, _loop);
}
public void OnNext(TSource value)
{
TimeSpan elapsed = _watch.Elapsed;
_queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, elapsed));
Trim(elapsed);
}
public void OnError(Exception error)
{
_observer.OnError(error);
base.Dispose();
}
public void OnCompleted()
{
_subscription.Dispose();
TimeSpan elapsed = _watch.Elapsed;
Trim(elapsed);
ISchedulerLongRunning schedulerLongRunning = _parent._loopScheduler.AsLongRunning();
if (schedulerLongRunning != null)
_loop.Disposable = schedulerLongRunning.ScheduleLongRunning(Loop);
else
_loop.Disposable = _parent._loopScheduler.Schedule(LoopRec);
}
private void LoopRec(Action recurse)
{
if (_queue.Count > 0) {
_observer.OnNext(_queue.Dequeue().Value);
recurse();
} else {
_observer.OnCompleted();
base.Dispose();
}
}
private void Loop(ICancelable cancel)
{
int num = _queue.Count;
while (!cancel.IsDisposed) {
if (num == 0) {
_observer.OnCompleted();
break;
}
_observer.OnNext(_queue.Dequeue().Value);
num--;
}
base.Dispose();
}
private void Trim(TimeSpan now)
{
while (_queue.Count > 0 && now - _queue.Peek().Interval >= _parent._duration) {
_queue.Dequeue();
}
}
}
private readonly IObservable<TSource> _source;
private readonly TimeSpan _duration;
private readonly IScheduler _scheduler;
private readonly IScheduler _loopScheduler;
public Time(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler, IScheduler loopScheduler)
{
_source = source;
_duration = duration;
_scheduler = scheduler;
_loopScheduler = loopScheduler;
}
protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run();
}
}
}
}