<PackageReference Include="System.Reactive" Version="4.0.0" />

Buffer<TSource>

static class Buffer<TSource>
using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal static class Buffer<TSource> { internal sealed class Count : Producer<IList<TSource>, Count._> { internal sealed class _ : Sink<IList<TSource>>, IObserver<TSource> { private readonly Queue<IList<TSource>> _queue = new Queue<IList<TSource>>(); private readonly int _count; private readonly int _skip; private int _n; public _(Count parent, IObserver<IList<TSource>> observer, IDisposable cancel) : base(observer, cancel) { _count = parent._count; _skip = parent._skip; } public IDisposable Run(IObservable<TSource> source) { _n = 0; CreateWindow(); return ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this); } private void CreateWindow() { List<TSource> item = new List<TSource>(); _queue.Enqueue(item); } public void OnNext(TSource value) { foreach (IList<TSource> item in _queue) { item.Add(value); } int num = _n - _count + 1; if (num >= 0 && num % _skip == 0) { IList<TSource> list = _queue.Dequeue(); if (list.Count > 0) _observer.OnNext(list); } _n++; if (_n % _skip == 0) CreateWindow(); } public void OnError(Exception error) { while (_queue.Count > 0) { _queue.Dequeue().Clear(); } _observer.OnError(error); base.Dispose(); } public void OnCompleted() { while (_queue.Count > 0) { IList<TSource> list = _queue.Dequeue(); if (list.Count > 0) _observer.OnNext(list); } _observer.OnCompleted(); base.Dispose(); } } private readonly IObservable<TSource> _source; private readonly int _count; private readonly int _skip; public Count(IObservable<TSource> source, int count, int skip) { _source = source; _count = count; _skip = skip; } protected override _ CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) { return new _(this, observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(_source); } } internal sealed class TimeSliding : Producer<IList<TSource>, TimeSliding._> { internal sealed class _ : Sink<IList<TSource>>, IObserver<TSource> { private struct State { public bool isSpan; public bool isShift; } private readonly TimeSpan _timeShift; private readonly IScheduler _scheduler; private readonly object _gate = new object(); private readonly Queue<List<TSource>> _q = new Queue<List<TSource>>(); private readonly SerialDisposable _timerD = new SerialDisposable(); private TimeSpan _totalTime; private TimeSpan _nextShift; private TimeSpan _nextSpan; public _(TimeSliding parent, IObserver<IList<TSource>> observer, IDisposable cancel) : base(observer, cancel) { _timeShift = parent._timeShift; _scheduler = parent._scheduler; } public IDisposable Run(TimeSliding parent) { _totalTime = TimeSpan.Zero; _nextShift = parent._timeShift; _nextSpan = parent._timeSpan; CreateWindow(); CreateTimer(); IDisposable disposable = ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this); return StableCompositeDisposable.Create(_timerD, disposable); } private void CreateWindow() { List<TSource> item = new List<TSource>(); _q.Enqueue(item); } private void CreateTimer() { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); _timerD.Disposable = singleAssignmentDisposable; bool flag = false; bool flag2 = false; if (_nextSpan == _nextShift) { flag = true; flag2 = true; } else if (_nextSpan < _nextShift) { flag = true; } else { flag2 = true; } TimeSpan timeSpan = flag ? _nextSpan : _nextShift; TimeSpan dueTime = timeSpan - _totalTime; _totalTime = timeSpan; if (flag) _nextSpan += _timeShift; if (flag2) _nextShift += _timeShift; singleAssignmentDisposable.Disposable = _scheduler.Schedule<State>(new State { isSpan = flag, isShift = flag2 }, dueTime, (Func<IScheduler, State, IDisposable>)Tick); } private IDisposable Tick(IScheduler self, State state) { lock (_gate) { if (state.isSpan) { List<TSource> value = _q.Dequeue(); _observer.OnNext(value); } if (state.isShift) CreateWindow(); } CreateTimer(); return Disposable.Empty; } public void OnNext(TSource value) { lock (_gate) { foreach (List<TSource> item in _q) { item.Add(value); } } } public void OnError(Exception error) { lock (_gate) { while (_q.Count > 0) { _q.Dequeue().Clear(); } _observer.OnError(error); base.Dispose(); } } public void OnCompleted() { lock (_gate) { while (_q.Count > 0) { _observer.OnNext(_q.Dequeue()); } _observer.OnCompleted(); base.Dispose(); } } } private readonly IObservable<TSource> _source; private readonly TimeSpan _timeSpan; private readonly TimeSpan _timeShift; private readonly IScheduler _scheduler; public TimeSliding(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _timeShift = timeShift; _scheduler = scheduler; } protected override _ CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) { return new _(this, observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(this); } } internal sealed class TimeHopping : Producer<IList<TSource>, TimeHopping._> { internal sealed class _ : Sink<IList<TSource>>, IObserver<TSource> { private readonly object _gate = new object(); private List<TSource> _list; public _(IObserver<IList<TSource>> observer, IDisposable cancel) : base(observer, cancel) { } public IDisposable Run(TimeHopping parent) { _list = new List<TSource>(); IDisposable disposable = parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick); IDisposable disposable2 = ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this); return StableCompositeDisposable.Create(disposable, disposable2); } private void Tick() { lock (_gate) { _observer.OnNext(_list); _list = new List<TSource>(); } } public void OnNext(TSource value) { lock (_gate) { _list.Add(value); } } public void OnError(Exception error) { lock (_gate) { _list.Clear(); _observer.OnError(error); base.Dispose(); } } public void OnCompleted() { lock (_gate) { _observer.OnNext(_list); _observer.OnCompleted(); base.Dispose(); } } } private readonly IObservable<TSource> _source; private readonly TimeSpan _timeSpan; private readonly IScheduler _scheduler; public TimeHopping(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _scheduler = scheduler; } protected override _ CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) { return new _(observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(this); } } internal sealed class Ferry : Producer<IList<TSource>, Ferry._> { internal sealed class _ : Sink<IList<TSource>>, IObserver<TSource> { private readonly Ferry _parent; private readonly object _gate = new object(); private readonly SerialDisposable _timerD = new SerialDisposable(); private IList<TSource> _s; private int _n; private int _windowId; public _(Ferry parent, IObserver<IList<TSource>> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } public IDisposable Run() { _s = new List<TSource>(); _n = 0; _windowId = 0; CreateTimer(0); IDisposable disposable = ObservableExtensions.SubscribeSafe<TSource>(_parent._source, (IObserver<TSource>)this); return StableCompositeDisposable.Create(_timerD, disposable); } private void CreateTimer(int id) { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); _timerD.Disposable = singleAssignmentDisposable; singleAssignmentDisposable.Disposable = _parent._scheduler.Schedule<int>(id, _parent._timeSpan, (Func<IScheduler, int, IDisposable>)Tick); } private IDisposable Tick(IScheduler self, int id) { IDisposable empty = Disposable.Empty; int num = 0; lock (_gate) { if (id == _windowId) { _n = 0; num = ++_windowId; IList<TSource> s = _s; _s = new List<TSource>(); _observer.OnNext(s); CreateTimer(num); return empty; } return empty; } } public void OnNext(TSource value) { bool flag = false; int id = 0; lock (_gate) { _s.Add(value); _n++; if (_n == _parent._count) { flag = true; _n = 0; id = ++_windowId; IList<TSource> s = _s; _s = new List<TSource>(); _observer.OnNext(s); } if (flag) CreateTimer(id); } } public void OnError(Exception error) { lock (_gate) { _s.Clear(); _observer.OnError(error); base.Dispose(); } } public void OnCompleted() { lock (_gate) { _observer.OnNext(_s); _observer.OnCompleted(); base.Dispose(); } } } private readonly IObservable<TSource> _source; private readonly int _count; private readonly TimeSpan _timeSpan; private readonly IScheduler _scheduler; public Ferry(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _count = count; _scheduler = scheduler; } protected override _ CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) { return new _(this, observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(); } } } }