<PackageReference Include="System.Reactive" Version="4.1.5" />

Buffer<TSource>

static class Buffer<TSource>
using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal static class Buffer<TSource> { internal sealed class CountExact : Producer<IList<TSource>, CountExact.ExactSink> { internal sealed class ExactSink : Sink<TSource, IList<TSource>> { private readonly int _count; private int _index; private IList<TSource> _buffer; internal ExactSink(IObserver<IList<TSource>> observer, int count) : base(observer) { _count = count; } public override void OnNext(TSource value) { IList<TSource> list = _buffer; if (list == null) list = (_buffer = new List<TSource>()); list.Add(value); int num = _index + 1; if (num == _count) { _buffer = null; _index = 0; ForwardOnNext(list); } else _index = num; } public override void OnError(Exception error) { _buffer = null; ForwardOnError(error); } public override void OnCompleted() { IList<TSource> buffer = _buffer; _buffer = null; if (buffer != null) ForwardOnNext(buffer); ForwardOnCompleted(); } } private readonly IObservable<TSource> _source; private readonly int _count; public CountExact(IObservable<TSource> source, int count) { _source = source; _count = count; } protected override ExactSink CreateSink(IObserver<IList<TSource>> observer) { return new ExactSink(observer, _count); } protected override void Run(ExactSink sink) { sink.Run(_source); } } internal sealed class CountSkip : Producer<IList<TSource>, CountSkip.SkipSink> { internal sealed class SkipSink : Sink<TSource, IList<TSource>> { private readonly int _count; private readonly int _skip; private int _index; private IList<TSource> _buffer; internal SkipSink(IObserver<IList<TSource>> observer, int count, int skip) : base(observer) { _count = count; _skip = skip; } public override void OnNext(TSource value) { int index = _index; IList<TSource> list = _buffer; if (index == 0) list = (_buffer = new List<TSource>()); list?.Add(value); if (++index == _count) { _buffer = null; ForwardOnNext(list); } if (index == _skip) _index = 0; else _index = index; } public override void OnError(Exception error) { _buffer = null; ForwardOnError(error); } public override void OnCompleted() { IList<TSource> buffer = _buffer; _buffer = null; if (buffer != null) ForwardOnNext(buffer); ForwardOnCompleted(); } } private readonly IObservable<TSource> _source; private readonly int _count; private readonly int _skip; public CountSkip(IObservable<TSource> source, int count, int skip) { _source = source; _count = count; _skip = skip; } protected override SkipSink CreateSink(IObserver<IList<TSource>> observer) { return new SkipSink(observer, _count, _skip); } protected override void Run(SkipSink sink) { sink.Run(_source); } } internal sealed class CountOverlap : Producer<IList<TSource>, CountOverlap.OverlapSink> { internal sealed class OverlapSink : Sink<TSource, IList<TSource>> { private readonly Queue<IList<TSource>> _queue; private readonly int _count; private readonly int _skip; private int _n; public OverlapSink(IObserver<IList<TSource>> observer, int count, int skip) : base(observer) { _queue = new Queue<IList<TSource>>(); _count = count; _skip = skip; CreateWindow(); } private void CreateWindow() { List<TSource> item = new List<TSource>(); _queue.Enqueue(item); } public override void OnNext(TSource value) { foreach (IList<TSource> item in _queue) { item.Add(value); } int num = _n - _count + 1; if (num >= 0 && num % _skip == 0) { IList<TSource> list = _queue.Dequeue(); if (list.Count > 0) ForwardOnNext(list); } _n++; if (_n % _skip == 0) CreateWindow(); } public override void OnError(Exception error) { _queue.Clear(); ForwardOnError(error); } public override void OnCompleted() { while (_queue.Count > 0) { IList<TSource> list = _queue.Dequeue(); if (list.Count > 0) ForwardOnNext(list); } ForwardOnCompleted(); } } private readonly IObservable<TSource> _source; private readonly int _count; private readonly int _skip; public CountOverlap(IObservable<TSource> source, int count, int skip) { _source = source; _count = count; _skip = skip; } protected override OverlapSink CreateSink(IObserver<IList<TSource>> observer) { return new OverlapSink(observer, _count, _skip); } protected override void Run(OverlapSink sink) { sink.Run(_source); } } internal sealed class TimeSliding : Producer<IList<TSource>, TimeSliding._> { internal sealed class _ : Sink<TSource, IList<TSource>> { private readonly TimeSpan _timeShift; private readonly IScheduler _scheduler; private readonly object _gate = new object(); private readonly Queue<List<TSource>> _q = new Queue<List<TSource>>(); private IDisposable _timerSerial; private TimeSpan _totalTime; private TimeSpan _nextShift; private TimeSpan _nextSpan; public _(TimeSliding parent, IObserver<IList<TSource>> observer) : base(observer) { _timeShift = parent._timeShift; _scheduler = parent._scheduler; } public void Run(TimeSliding parent) { _totalTime = TimeSpan.Zero; _nextShift = parent._timeShift; _nextSpan = parent._timeSpan; CreateWindow(); CreateTimer(); Run(parent._source); } protected override void Dispose(bool disposing) { if (disposing) Disposable.TryDispose(ref _timerSerial); base.Dispose(disposing); } private void CreateWindow() { List<TSource> item = new List<TSource>(); _q.Enqueue(item); } private void CreateTimer() { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); Disposable.TrySetSerial(ref _timerSerial, singleAssignmentDisposable); bool flag = false; bool flag2 = false; if (_nextSpan == _nextShift) { flag = true; flag2 = true; } else if (_nextSpan < _nextShift) { flag = true; } else { flag2 = true; } TimeSpan timeSpan = flag ? _nextSpan : _nextShift; TimeSpan dueTime = timeSpan - _totalTime; _totalTime = timeSpan; if (flag) _nextSpan += _timeShift; if (flag2) _nextShift += _timeShift; singleAssignmentDisposable.Disposable = Scheduler.ScheduleAction<(_, bool, bool)>(_scheduler, (this, flag, flag2), dueTime, (Action<(_, bool, bool)>)delegate((_ this, bool isSpan, bool isShift) tuple) { tuple.this.Tick(tuple.isSpan, tuple.isShift); }); } private void Tick(bool isSpan, bool isShift) { lock (_gate) { if (isSpan) { List<TSource> value = _q.Dequeue(); ForwardOnNext(value); } if (isShift) CreateWindow(); } CreateTimer(); } public override void OnNext(TSource value) { lock (_gate) { foreach (List<TSource> item in _q) { item.Add(value); } } } public override void OnError(Exception error) { lock (_gate) { while (_q.Count > 0) { _q.Dequeue().Clear(); } ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { while (_q.Count > 0) { ForwardOnNext(_q.Dequeue()); } ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly TimeSpan _timeSpan; private readonly TimeSpan _timeShift; private readonly IScheduler _scheduler; public TimeSliding(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _timeShift = timeShift; _scheduler = scheduler; } protected override _ CreateSink(IObserver<IList<TSource>> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(this); } } internal sealed class TimeHopping : Producer<IList<TSource>, TimeHopping._> { internal sealed class _ : Sink<TSource, IList<TSource>> { private readonly object _gate = new object(); private List<TSource> _list; private IDisposable _periodicDisposable; public _(IObserver<IList<TSource>> observer) : base(observer) { } public void Run(TimeHopping parent) { _list = new List<TSource>(); Disposable.SetSingle(ref _periodicDisposable, Scheduler.SchedulePeriodic<_>(parent._scheduler, this, parent._timeSpan, (Action<_>)delegate(_ this) { this.Tick(); })); Run(parent._source); } protected override void Dispose(bool disposing) { if (disposing) Disposable.TryDispose(ref _periodicDisposable); base.Dispose(disposing); } private void Tick() { lock (_gate) { ForwardOnNext(_list); _list = new List<TSource>(); } } public override void OnNext(TSource value) { lock (_gate) { _list.Add(value); } } public override void OnError(Exception error) { lock (_gate) { _list.Clear(); ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { ForwardOnNext(_list); ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly TimeSpan _timeSpan; private readonly IScheduler _scheduler; public TimeHopping(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _scheduler = scheduler; } protected override _ CreateSink(IObserver<IList<TSource>> observer) { return new _(observer); } protected override void Run(_ sink) { sink.Run(this); } } internal sealed class Ferry : Producer<IList<TSource>, Ferry._> { internal sealed class _ : Sink<TSource, IList<TSource>> { private readonly Ferry _parent; private readonly object _gate = new object(); private IDisposable _timerSerial; private IList<TSource> _s; private int _n; private int _windowId; public _(Ferry parent, IObserver<IList<TSource>> observer) : base(observer) { _parent = parent; } public void Run() { _s = new List<TSource>(); _n = 0; _windowId = 0; CreateTimer(0); SetUpstream(ObservableExtensions.SubscribeSafe<TSource>(_parent._source, (IObserver<TSource>)this)); } protected override void Dispose(bool disposing) { if (disposing) Disposable.TryDispose(ref _timerSerial); base.Dispose(disposing); } private void CreateTimer(int id) { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); Disposable.TrySetSerial(ref _timerSerial, singleAssignmentDisposable); singleAssignmentDisposable.Disposable = Scheduler.ScheduleAction<(_, int)>(_parent._scheduler, (this, id), _parent._timeSpan, (Action<(_, int)>)delegate((_ this, int id) tuple) { tuple.this.Tick(tuple.id); }); } private void Tick(int id) { lock (_gate) { if (id == _windowId) { _n = 0; int id2 = ++_windowId; IList<TSource> s = _s; _s = new List<TSource>(); ForwardOnNext(s); CreateTimer(id2); } } } public override void OnNext(TSource value) { bool flag = false; int id = 0; lock (_gate) { _s.Add(value); _n++; if (_n == _parent._count) { flag = true; _n = 0; id = ++_windowId; IList<TSource> s = _s; _s = new List<TSource>(); ForwardOnNext(s); } if (flag) CreateTimer(id); } } public override void OnError(Exception error) { lock (_gate) { _s.Clear(); ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { ForwardOnNext(_s); ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly int _count; private readonly TimeSpan _timeSpan; private readonly IScheduler _scheduler; public Ferry(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _count = count; _scheduler = scheduler; } protected override _ CreateSink(IObserver<IList<TSource>> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(); } } } }