<PackageReference Include="System.Reactive" Version="4.1.0-preview.84" />

Buffer<TSource>

static class Buffer<TSource>
using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal static class Buffer<TSource> { internal sealed class Count : Producer<IList<TSource>, Count._> { internal sealed class _ : Sink<TSource, IList<TSource>> { private readonly Queue<IList<TSource>> _queue = new Queue<IList<TSource>>(); private readonly int _count; private readonly int _skip; private int _n; public _(Count parent, IObserver<IList<TSource>> observer) : base(observer) { _count = parent._count; _skip = parent._skip; } public override void Run(IObservable<TSource> source) { _n = 0; CreateWindow(); base.Run(source); } private void CreateWindow() { List<TSource> item = new List<TSource>(); _queue.Enqueue(item); } public override void OnNext(TSource value) { foreach (IList<TSource> item in _queue) { item.Add(value); } int num = _n - _count + 1; if (num >= 0 && num % _skip == 0) { IList<TSource> list = _queue.Dequeue(); if (list.Count > 0) ForwardOnNext(list); } _n++; if (_n % _skip == 0) CreateWindow(); } public override void OnError(Exception error) { while (_queue.Count > 0) { _queue.Dequeue().Clear(); } ForwardOnError(error); } public override void OnCompleted() { while (_queue.Count > 0) { IList<TSource> list = _queue.Dequeue(); if (list.Count > 0) ForwardOnNext(list); } ForwardOnCompleted(); } } private readonly IObservable<TSource> _source; private readonly int _count; private readonly int _skip; public Count(IObservable<TSource> source, int count, int skip) { _source = source; _count = count; _skip = skip; } protected override _ CreateSink(IObserver<IList<TSource>> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class TimeSliding : Producer<IList<TSource>, TimeSliding._> { internal sealed class _ : Sink<TSource, IList<TSource>> { private struct State { public bool isSpan; public bool isShift; } private readonly TimeSpan _timeShift; private readonly IScheduler _scheduler; private readonly object _gate = new object(); private readonly Queue<List<TSource>> _q = new Queue<List<TSource>>(); private IDisposable _timerSerial; private TimeSpan _totalTime; private TimeSpan _nextShift; private TimeSpan _nextSpan; public _(TimeSliding parent, IObserver<IList<TSource>> observer) : base(observer) { _timeShift = parent._timeShift; _scheduler = parent._scheduler; } public void Run(TimeSliding parent) { _totalTime = TimeSpan.Zero; _nextShift = parent._timeShift; _nextSpan = parent._timeSpan; CreateWindow(); CreateTimer(); base.Run(parent._source); } protected override void Dispose(bool disposing) { if (disposing) Disposable.TryDispose(ref _timerSerial); base.Dispose(disposing); } private void CreateWindow() { List<TSource> item = new List<TSource>(); _q.Enqueue(item); } private void CreateTimer() { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); Disposable.TrySetSerial(ref _timerSerial, singleAssignmentDisposable); bool flag = false; bool flag2 = false; if (_nextSpan == _nextShift) { flag = true; flag2 = true; } else if (_nextSpan < _nextShift) { flag = true; } else { flag2 = true; } TimeSpan timeSpan = flag ? _nextSpan : _nextShift; TimeSpan dueTime = timeSpan - _totalTime; _totalTime = timeSpan; if (flag) _nextSpan += _timeShift; if (flag2) _nextShift += _timeShift; singleAssignmentDisposable.Disposable = _scheduler.Schedule<State>(new State { isSpan = flag, isShift = flag2 }, dueTime, (Func<IScheduler, State, IDisposable>)Tick); } private IDisposable Tick(IScheduler self, State state) { lock (_gate) { if (state.isSpan) { List<TSource> value = _q.Dequeue(); ForwardOnNext(value); } if (state.isShift) CreateWindow(); } CreateTimer(); return Disposable.Empty; } public override void OnNext(TSource value) { lock (_gate) { foreach (List<TSource> item in _q) { item.Add(value); } } } public override void OnError(Exception error) { lock (_gate) { while (_q.Count > 0) { _q.Dequeue().Clear(); } ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { while (_q.Count > 0) { ForwardOnNext(_q.Dequeue()); } ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly TimeSpan _timeSpan; private readonly TimeSpan _timeShift; private readonly IScheduler _scheduler; public TimeSliding(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _timeShift = timeShift; _scheduler = scheduler; } protected override _ CreateSink(IObserver<IList<TSource>> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(this); } } internal sealed class TimeHopping : Producer<IList<TSource>, TimeHopping._> { internal sealed class _ : Sink<TSource, IList<TSource>> { private readonly object _gate = new object(); private List<TSource> _list; private IDisposable _periodicDisposable; public _(IObserver<IList<TSource>> observer) : base(observer) { } public void Run(TimeHopping parent) { _list = new List<TSource>(); Disposable.SetSingle(ref _periodicDisposable, parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick)); base.Run(parent._source); } protected override void Dispose(bool disposing) { if (disposing) Disposable.TryDispose(ref _periodicDisposable); base.Dispose(disposing); } private void Tick() { lock (_gate) { ForwardOnNext(_list); _list = new List<TSource>(); } } public override void OnNext(TSource value) { lock (_gate) { _list.Add(value); } } public override void OnError(Exception error) { lock (_gate) { _list.Clear(); ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { ForwardOnNext(_list); ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly TimeSpan _timeSpan; private readonly IScheduler _scheduler; public TimeHopping(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _scheduler = scheduler; } protected override _ CreateSink(IObserver<IList<TSource>> observer) { return new _(observer); } protected override void Run(_ sink) { sink.Run(this); } } internal sealed class Ferry : Producer<IList<TSource>, Ferry._> { internal sealed class _ : Sink<TSource, IList<TSource>> { private readonly Ferry _parent; private readonly object _gate = new object(); private IDisposable _timerSerial; private IList<TSource> _s; private int _n; private int _windowId; public _(Ferry parent, IObserver<IList<TSource>> observer) : base(observer) { _parent = parent; } public void Run() { _s = new List<TSource>(); _n = 0; _windowId = 0; CreateTimer(0); SetUpstream(ObservableExtensions.SubscribeSafe<TSource>(_parent._source, (IObserver<TSource>)this)); } protected override void Dispose(bool disposing) { if (disposing) Disposable.TryDispose(ref _timerSerial); base.Dispose(disposing); } private void CreateTimer(int id) { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); Disposable.TrySetSerial(ref _timerSerial, singleAssignmentDisposable); singleAssignmentDisposable.Disposable = _parent._scheduler.Schedule<int>(id, _parent._timeSpan, (Func<IScheduler, int, IDisposable>)Tick); } private IDisposable Tick(IScheduler self, int id) { IDisposable empty = Disposable.Empty; int num = 0; lock (_gate) { if (id == _windowId) { _n = 0; num = ++_windowId; IList<TSource> s = _s; _s = new List<TSource>(); ForwardOnNext(s); CreateTimer(num); return empty; } return empty; } } public override void OnNext(TSource value) { bool flag = false; int id = 0; lock (_gate) { _s.Add(value); _n++; if (_n == _parent._count) { flag = true; _n = 0; id = ++_windowId; IList<TSource> s = _s; _s = new List<TSource>(); ForwardOnNext(s); } if (flag) CreateTimer(id); } } public override void OnError(Exception error) { lock (_gate) { _s.Clear(); ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { ForwardOnNext(_s); ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly int _count; private readonly TimeSpan _timeSpan; private readonly IScheduler _scheduler; public Ferry(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _count = count; _scheduler = scheduler; } protected override _ CreateSink(IObserver<IList<TSource>> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(); } } } }