Buffer<TSource>
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class Buffer<TSource>
{
internal sealed class CountExact : Producer<IList<TSource>, CountExact.ExactSink>
{
internal sealed class ExactSink : Sink<TSource, IList<TSource>>
{
private readonly int _count;
private int _index;
private IList<TSource> _buffer;
internal ExactSink(IObserver<IList<TSource>> observer, int count)
: base(observer)
{
_count = count;
}
public override void OnNext(TSource value)
{
IList<TSource> list = _buffer;
if (list == null)
list = (_buffer = new List<TSource>());
list.Add(value);
int num = _index + 1;
if (num == _count) {
_buffer = null;
_index = 0;
ForwardOnNext(list);
} else
_index = num;
}
public override void OnError(Exception error)
{
_buffer = null;
ForwardOnError(error);
}
public override void OnCompleted()
{
IList<TSource> buffer = _buffer;
_buffer = null;
if (buffer != null)
ForwardOnNext(buffer);
ForwardOnCompleted();
}
}
private readonly IObservable<TSource> _source;
private readonly int _count;
public CountExact(IObservable<TSource> source, int count)
{
_source = source;
_count = count;
}
protected override ExactSink CreateSink(IObserver<IList<TSource>> observer)
{
return new ExactSink(observer, _count);
}
protected override void Run(ExactSink sink)
{
sink.Run(_source);
}
}
internal sealed class CountSkip : Producer<IList<TSource>, CountSkip.SkipSink>
{
internal sealed class SkipSink : Sink<TSource, IList<TSource>>
{
private readonly int _count;
private readonly int _skip;
private int _index;
private IList<TSource> _buffer;
internal SkipSink(IObserver<IList<TSource>> observer, int count, int skip)
: base(observer)
{
_count = count;
_skip = skip;
}
public override void OnNext(TSource value)
{
int index = _index;
IList<TSource> list = _buffer;
if (index == 0)
list = (_buffer = new List<TSource>());
list?.Add(value);
if (++index == _count) {
_buffer = null;
ForwardOnNext(list);
}
if (index == _skip)
_index = 0;
else
_index = index;
}
public override void OnError(Exception error)
{
_buffer = null;
ForwardOnError(error);
}
public override void OnCompleted()
{
IList<TSource> buffer = _buffer;
_buffer = null;
if (buffer != null)
ForwardOnNext(buffer);
ForwardOnCompleted();
}
}
private readonly IObservable<TSource> _source;
private readonly int _count;
private readonly int _skip;
public CountSkip(IObservable<TSource> source, int count, int skip)
{
_source = source;
_count = count;
_skip = skip;
}
protected override SkipSink CreateSink(IObserver<IList<TSource>> observer)
{
return new SkipSink(observer, _count, _skip);
}
protected override void Run(SkipSink sink)
{
sink.Run(_source);
}
}
internal sealed class CountOverlap : Producer<IList<TSource>, CountOverlap.OverlapSink>
{
internal sealed class OverlapSink : Sink<TSource, IList<TSource>>
{
private readonly Queue<IList<TSource>> _queue;
private readonly int _count;
private readonly int _skip;
private int _n;
public OverlapSink(IObserver<IList<TSource>> observer, int count, int skip)
: base(observer)
{
_queue = new Queue<IList<TSource>>();
_count = count;
_skip = skip;
CreateWindow();
}
private void CreateWindow()
{
List<TSource> item = new List<TSource>();
_queue.Enqueue(item);
}
public override void OnNext(TSource value)
{
foreach (IList<TSource> item in _queue) {
item.Add(value);
}
int num = _n - _count + 1;
if (num >= 0 && num % _skip == 0) {
IList<TSource> list = _queue.Dequeue();
if (list.Count > 0)
ForwardOnNext(list);
}
_n++;
if (_n % _skip == 0)
CreateWindow();
}
public override void OnError(Exception error)
{
_queue.Clear();
ForwardOnError(error);
}
public override void OnCompleted()
{
while (_queue.Count > 0) {
IList<TSource> list = _queue.Dequeue();
if (list.Count > 0)
ForwardOnNext(list);
}
ForwardOnCompleted();
}
}
private readonly IObservable<TSource> _source;
private readonly int _count;
private readonly int _skip;
public CountOverlap(IObservable<TSource> source, int count, int skip)
{
_source = source;
_count = count;
_skip = skip;
}
protected override OverlapSink CreateSink(IObserver<IList<TSource>> observer)
{
return new OverlapSink(observer, _count, _skip);
}
protected override void Run(OverlapSink sink)
{
sink.Run(_source);
}
}
internal sealed class TimeSliding : Producer<IList<TSource>, TimeSliding._>
{
internal sealed class _ : Sink<TSource, IList<TSource>>
{
private readonly TimeSpan _timeShift;
private readonly IScheduler _scheduler;
private readonly object _gate = new object();
private readonly Queue<List<TSource>> _q = new Queue<List<TSource>>();
private IDisposable _timerSerial;
private TimeSpan _totalTime;
private TimeSpan _nextShift;
private TimeSpan _nextSpan;
public _(TimeSliding parent, IObserver<IList<TSource>> observer)
: base(observer)
{
_timeShift = parent._timeShift;
_scheduler = parent._scheduler;
}
public void Run(TimeSliding parent)
{
_totalTime = TimeSpan.Zero;
_nextShift = parent._timeShift;
_nextSpan = parent._timeSpan;
CreateWindow();
CreateTimer();
Run(parent._source);
}
protected override void Dispose(bool disposing)
{
if (disposing)
Disposable.TryDispose(ref _timerSerial);
base.Dispose(disposing);
}
private void CreateWindow()
{
List<TSource> item = new List<TSource>();
_q.Enqueue(item);
}
private void CreateTimer()
{
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
Disposable.TrySetSerial(ref _timerSerial, singleAssignmentDisposable);
bool flag = false;
bool flag2 = false;
if (_nextSpan == _nextShift) {
flag = true;
flag2 = true;
} else if (_nextSpan < _nextShift) {
flag = true;
} else {
flag2 = true;
}
TimeSpan timeSpan = flag ? _nextSpan : _nextShift;
TimeSpan dueTime = timeSpan - _totalTime;
_totalTime = timeSpan;
if (flag)
_nextSpan += _timeShift;
if (flag2)
_nextShift += _timeShift;
singleAssignmentDisposable.Disposable = Scheduler.ScheduleAction<(_, bool, bool)>(_scheduler, (this, flag, flag2), dueTime, (Action<(_, bool, bool)>)delegate((_ this, bool isSpan, bool isShift) tuple) {
tuple.this.Tick(tuple.isSpan, tuple.isShift);
});
}
private void Tick(bool isSpan, bool isShift)
{
lock (_gate) {
if (isSpan) {
List<TSource> value = _q.Dequeue();
ForwardOnNext(value);
}
if (isShift)
CreateWindow();
}
CreateTimer();
}
public override void OnNext(TSource value)
{
lock (_gate) {
foreach (List<TSource> item in _q) {
item.Add(value);
}
}
}
public override void OnError(Exception error)
{
lock (_gate) {
while (_q.Count > 0) {
_q.Dequeue().Clear();
}
ForwardOnError(error);
}
}
public override void OnCompleted()
{
lock (_gate) {
while (_q.Count > 0) {
ForwardOnNext(_q.Dequeue());
}
ForwardOnCompleted();
}
}
}
private readonly IObservable<TSource> _source;
private readonly TimeSpan _timeSpan;
private readonly TimeSpan _timeShift;
private readonly IScheduler _scheduler;
public TimeSliding(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
{
_source = source;
_timeSpan = timeSpan;
_timeShift = timeShift;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<IList<TSource>> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(this);
}
}
internal sealed class TimeHopping : Producer<IList<TSource>, TimeHopping._>
{
internal sealed class _ : Sink<TSource, IList<TSource>>
{
private readonly object _gate = new object();
private List<TSource> _list;
private IDisposable _periodicDisposable;
public _(IObserver<IList<TSource>> observer)
: base(observer)
{
}
public void Run(TimeHopping parent)
{
_list = new List<TSource>();
Disposable.SetSingle(ref _periodicDisposable, Scheduler.SchedulePeriodic<_>(parent._scheduler, this, parent._timeSpan, (Action<_>)delegate(_ this) {
this.Tick();
}));
Run(parent._source);
}
protected override void Dispose(bool disposing)
{
if (disposing)
Disposable.TryDispose(ref _periodicDisposable);
base.Dispose(disposing);
}
private void Tick()
{
lock (_gate) {
ForwardOnNext(_list);
_list = new List<TSource>();
}
}
public override void OnNext(TSource value)
{
lock (_gate) {
_list.Add(value);
}
}
public override void OnError(Exception error)
{
lock (_gate) {
_list.Clear();
ForwardOnError(error);
}
}
public override void OnCompleted()
{
lock (_gate) {
ForwardOnNext(_list);
ForwardOnCompleted();
}
}
}
private readonly IObservable<TSource> _source;
private readonly TimeSpan _timeSpan;
private readonly IScheduler _scheduler;
public TimeHopping(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)
{
_source = source;
_timeSpan = timeSpan;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<IList<TSource>> observer)
{
return new _(observer);
}
protected override void Run(_ sink)
{
sink.Run(this);
}
}
internal sealed class Ferry : Producer<IList<TSource>, Ferry._>
{
internal sealed class _ : Sink<TSource, IList<TSource>>
{
private readonly Ferry _parent;
private readonly object _gate = new object();
private IDisposable _timerSerial;
private IList<TSource> _s;
private int _n;
private int _windowId;
public _(Ferry parent, IObserver<IList<TSource>> observer)
: base(observer)
{
_parent = parent;
}
public void Run()
{
_s = new List<TSource>();
_n = 0;
_windowId = 0;
CreateTimer(0);
SetUpstream(ObservableExtensions.SubscribeSafe<TSource>(_parent._source, (IObserver<TSource>)this));
}
protected override void Dispose(bool disposing)
{
if (disposing)
Disposable.TryDispose(ref _timerSerial);
base.Dispose(disposing);
}
private void CreateTimer(int id)
{
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
Disposable.TrySetSerial(ref _timerSerial, singleAssignmentDisposable);
singleAssignmentDisposable.Disposable = Scheduler.ScheduleAction<(_, int)>(_parent._scheduler, (this, id), _parent._timeSpan, (Action<(_, int)>)delegate((_ this, int id) tuple) {
tuple.this.Tick(tuple.id);
});
}
private void Tick(int id)
{
lock (_gate) {
if (id == _windowId) {
_n = 0;
int id2 = ++_windowId;
IList<TSource> s = _s;
_s = new List<TSource>();
ForwardOnNext(s);
CreateTimer(id2);
}
}
}
public override void OnNext(TSource value)
{
bool flag = false;
int id = 0;
lock (_gate) {
_s.Add(value);
_n++;
if (_n == _parent._count) {
flag = true;
_n = 0;
id = ++_windowId;
IList<TSource> s = _s;
_s = new List<TSource>();
ForwardOnNext(s);
}
if (flag)
CreateTimer(id);
}
}
public override void OnError(Exception error)
{
lock (_gate) {
_s.Clear();
ForwardOnError(error);
}
}
public override void OnCompleted()
{
lock (_gate) {
ForwardOnNext(_s);
ForwardOnCompleted();
}
}
}
private readonly IObservable<TSource> _source;
private readonly int _count;
private readonly TimeSpan _timeSpan;
private readonly IScheduler _scheduler;
public Ferry(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
{
_source = source;
_timeSpan = timeSpan;
_count = count;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<IList<TSource>> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run();
}
}
}
}