Buffer<TSource>
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class Buffer<TSource>
{
internal sealed class Count : Producer<IList<TSource>, Count._>
{
internal sealed class _ : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly Queue<IList<TSource>> _queue = new Queue<IList<TSource>>();
private readonly int _count;
private readonly int _skip;
private int _n;
public _(Count parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_count = parent._count;
_skip = parent._skip;
}
public IDisposable Run(IObservable<TSource> source)
{
_n = 0;
CreateWindow();
return ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this);
}
private void CreateWindow()
{
List<TSource> item = new List<TSource>();
_queue.Enqueue(item);
}
public void OnNext(TSource value)
{
foreach (IList<TSource> item in _queue) {
item.Add(value);
}
int num = _n - _count + 1;
if (num >= 0 && num % _skip == 0) {
IList<TSource> list = _queue.Dequeue();
if (list.Count > 0)
_observer.OnNext(list);
}
_n++;
if (_n % _skip == 0)
CreateWindow();
}
public void OnError(Exception error)
{
while (_queue.Count > 0) {
_queue.Dequeue().Clear();
}
_observer.OnError(error);
base.Dispose();
}
public void OnCompleted()
{
while (_queue.Count > 0) {
IList<TSource> list = _queue.Dequeue();
if (list.Count > 0)
_observer.OnNext(list);
}
_observer.OnCompleted();
base.Dispose();
}
}
private readonly IObservable<TSource> _source;
private readonly int _count;
private readonly int _skip;
public Count(IObservable<TSource> source, int count, int skip)
{
_source = source;
_count = count;
_skip = skip;
}
protected override _ CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(_source);
}
}
internal sealed class TimeSliding : Producer<IList<TSource>, TimeSliding._>
{
internal sealed class _ : Sink<IList<TSource>>, IObserver<TSource>
{
private struct State
{
public bool isSpan;
public bool isShift;
}
private readonly TimeSpan _timeShift;
private readonly IScheduler _scheduler;
private readonly object _gate = new object();
private readonly Queue<List<TSource>> _q = new Queue<List<TSource>>();
private readonly SerialDisposable _timerD = new SerialDisposable();
private TimeSpan _totalTime;
private TimeSpan _nextShift;
private TimeSpan _nextSpan;
public _(TimeSliding parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_timeShift = parent._timeShift;
_scheduler = parent._scheduler;
}
public IDisposable Run(TimeSliding parent)
{
_totalTime = TimeSpan.Zero;
_nextShift = parent._timeShift;
_nextSpan = parent._timeSpan;
CreateWindow();
CreateTimer();
IDisposable disposable = ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this);
return StableCompositeDisposable.Create(_timerD, disposable);
}
private void CreateWindow()
{
List<TSource> item = new List<TSource>();
_q.Enqueue(item);
}
private void CreateTimer()
{
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
_timerD.Disposable = singleAssignmentDisposable;
bool flag = false;
bool flag2 = false;
if (_nextSpan == _nextShift) {
flag = true;
flag2 = true;
} else if (_nextSpan < _nextShift) {
flag = true;
} else {
flag2 = true;
}
TimeSpan timeSpan = flag ? _nextSpan : _nextShift;
TimeSpan dueTime = timeSpan - _totalTime;
_totalTime = timeSpan;
if (flag)
_nextSpan += _timeShift;
if (flag2)
_nextShift += _timeShift;
singleAssignmentDisposable.Disposable = _scheduler.Schedule<State>(new State {
isSpan = flag,
isShift = flag2
}, dueTime, (Func<IScheduler, State, IDisposable>)Tick);
}
private IDisposable Tick(IScheduler self, State state)
{
lock (_gate) {
if (state.isSpan) {
List<TSource> value = _q.Dequeue();
_observer.OnNext(value);
}
if (state.isShift)
CreateWindow();
}
CreateTimer();
return Disposable.Empty;
}
public void OnNext(TSource value)
{
lock (_gate) {
foreach (List<TSource> item in _q) {
item.Add(value);
}
}
}
public void OnError(Exception error)
{
lock (_gate) {
while (_q.Count > 0) {
_q.Dequeue().Clear();
}
_observer.OnError(error);
base.Dispose();
}
}
public void OnCompleted()
{
lock (_gate) {
while (_q.Count > 0) {
_observer.OnNext(_q.Dequeue());
}
_observer.OnCompleted();
base.Dispose();
}
}
}
private readonly IObservable<TSource> _source;
private readonly TimeSpan _timeSpan;
private readonly TimeSpan _timeShift;
private readonly IScheduler _scheduler;
public TimeSliding(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
{
_source = source;
_timeSpan = timeSpan;
_timeShift = timeShift;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(this);
}
}
internal sealed class TimeHopping : Producer<IList<TSource>, TimeHopping._>
{
internal sealed class _ : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly object _gate = new object();
private List<TSource> _list;
public _(IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
}
public IDisposable Run(TimeHopping parent)
{
_list = new List<TSource>();
IDisposable disposable = parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick);
IDisposable disposable2 = ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this);
return StableCompositeDisposable.Create(disposable, disposable2);
}
private void Tick()
{
lock (_gate) {
_observer.OnNext(_list);
_list = new List<TSource>();
}
}
public void OnNext(TSource value)
{
lock (_gate) {
_list.Add(value);
}
}
public void OnError(Exception error)
{
lock (_gate) {
_list.Clear();
_observer.OnError(error);
base.Dispose();
}
}
public void OnCompleted()
{
lock (_gate) {
_observer.OnNext(_list);
_observer.OnCompleted();
base.Dispose();
}
}
}
private readonly IObservable<TSource> _source;
private readonly TimeSpan _timeSpan;
private readonly IScheduler _scheduler;
public TimeHopping(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)
{
_source = source;
_timeSpan = timeSpan;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel)
{
return new _(observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(this);
}
}
internal sealed class Ferry : Producer<IList<TSource>, Ferry._>
{
internal sealed class _ : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly Ferry _parent;
private readonly object _gate = new object();
private readonly SerialDisposable _timerD = new SerialDisposable();
private IList<TSource> _s;
private int _n;
private int _windowId;
public _(Ferry parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
}
public IDisposable Run()
{
_s = new List<TSource>();
_n = 0;
_windowId = 0;
CreateTimer(0);
IDisposable disposable = ObservableExtensions.SubscribeSafe<TSource>(_parent._source, (IObserver<TSource>)this);
return StableCompositeDisposable.Create(_timerD, disposable);
}
private void CreateTimer(int id)
{
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
_timerD.Disposable = singleAssignmentDisposable;
singleAssignmentDisposable.Disposable = _parent._scheduler.Schedule<int>(id, _parent._timeSpan, (Func<IScheduler, int, IDisposable>)Tick);
}
private IDisposable Tick(IScheduler self, int id)
{
IDisposable empty = Disposable.Empty;
int num = 0;
lock (_gate) {
if (id == _windowId) {
_n = 0;
num = ++_windowId;
IList<TSource> s = _s;
_s = new List<TSource>();
_observer.OnNext(s);
CreateTimer(num);
return empty;
}
return empty;
}
}
public void OnNext(TSource value)
{
bool flag = false;
int id = 0;
lock (_gate) {
_s.Add(value);
_n++;
if (_n == _parent._count) {
flag = true;
_n = 0;
id = ++_windowId;
IList<TSource> s = _s;
_s = new List<TSource>();
_observer.OnNext(s);
}
if (flag)
CreateTimer(id);
}
}
public void OnError(Exception error)
{
lock (_gate) {
_s.Clear();
_observer.OnError(error);
base.Dispose();
}
}
public void OnCompleted()
{
lock (_gate) {
_observer.OnNext(_s);
_observer.OnCompleted();
base.Dispose();
}
}
}
private readonly IObservable<TSource> _source;
private readonly int _count;
private readonly TimeSpan _timeSpan;
private readonly IScheduler _scheduler;
public Ferry(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
{
_source = source;
_timeSpan = timeSpan;
_count = count;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run();
}
}
}
}