<PackageReference Include="System.Reactive" Version="4.1.0-preview.84" />

Window<TSource>

static class Window<TSource>
using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; namespace System.Reactive.Linq.ObservableImpl { internal static class Window<TSource> { internal sealed class Count : Producer<IObservable<TSource>, Count._> { internal sealed class _ : Sink<TSource, IObservable<TSource>> { private readonly Queue<ISubject<TSource>> _queue = new Queue<ISubject<TSource>>(); private readonly SingleAssignmentDisposable _m = new SingleAssignmentDisposable(); private readonly RefCountDisposable _refCountDisposable; private readonly int _count; private readonly int _skip; private int _n; public _(Count parent, IObserver<IObservable<TSource>> observer) : base(observer) { _refCountDisposable = new RefCountDisposable(_m); _count = parent._count; _skip = parent._skip; } public override void Run(IObservable<TSource> source) { _n = 0; IObservable<TSource> value = CreateWindow(); ForwardOnNext(value); _m.Disposable = ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this); SetUpstream(_refCountDisposable); } private IObservable<TSource> CreateWindow() { Subject<TSource> subject = new Subject<TSource>(); _queue.Enqueue(subject); return new WindowObservable<TSource>(subject, _refCountDisposable); } public override void OnNext(TSource value) { foreach (ISubject<TSource> item in _queue) { item.OnNext(value); } int num = _n - _count + 1; if (num >= 0 && num % _skip == 0) _queue.Dequeue().OnCompleted(); _n++; if (_n % _skip == 0) { IObservable<TSource> value2 = CreateWindow(); ForwardOnNext(value2); } } public override void OnError(Exception error) { while (_queue.Count > 0) { _queue.Dequeue().OnError(error); } ForwardOnError(error); } public override void OnCompleted() { while (_queue.Count > 0) { _queue.Dequeue().OnCompleted(); } ForwardOnCompleted(); } } private readonly IObservable<TSource> _source; private readonly int _count; private readonly int _skip; public Count(IObservable<TSource> source, int count, int skip) { _source = source; _count = count; _skip = skip; } protected override _ CreateSink(IObserver<IObservable<TSource>> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class TimeSliding : Producer<IObservable<TSource>, TimeSliding._> { internal sealed class _ : Sink<TSource, IObservable<TSource>> { private struct State { public bool isSpan; public bool isShift; } private readonly object _gate = new object(); private readonly Queue<ISubject<TSource>> _q = new Queue<ISubject<TSource>>(); private readonly SerialDisposable _timerD = new SerialDisposable(); private readonly IScheduler _scheduler; private readonly TimeSpan _timeShift; private RefCountDisposable _refCountDisposable; private TimeSpan _totalTime; private TimeSpan _nextShift; private TimeSpan _nextSpan; public _(TimeSliding parent, IObserver<IObservable<TSource>> observer) : base(observer) { _scheduler = parent._scheduler; _timeShift = parent._timeShift; } public void Run(TimeSliding parent) { _totalTime = TimeSpan.Zero; _nextShift = parent._timeShift; _nextSpan = parent._timeSpan; CompositeDisposable compositeDisposable = new CompositeDisposable(2) { _timerD }; _refCountDisposable = new RefCountDisposable(compositeDisposable); CreateWindow(); CreateTimer(); compositeDisposable.Add(ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this)); SetUpstream(_refCountDisposable); } private void CreateWindow() { Subject<TSource> subject = new Subject<TSource>(); _q.Enqueue(subject); ForwardOnNext(new WindowObservable<TSource>(subject, _refCountDisposable)); } private void CreateTimer() { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); _timerD.Disposable = singleAssignmentDisposable; bool flag = false; bool flag2 = false; if (_nextSpan == _nextShift) { flag = true; flag2 = true; } else if (_nextSpan < _nextShift) { flag = true; } else { flag2 = true; } TimeSpan timeSpan = flag ? _nextSpan : _nextShift; TimeSpan dueTime = timeSpan - _totalTime; _totalTime = timeSpan; if (flag) _nextSpan += _timeShift; if (flag2) _nextShift += _timeShift; singleAssignmentDisposable.Disposable = _scheduler.Schedule<State>(new State { isSpan = flag, isShift = flag2 }, dueTime, (Func<IScheduler, State, IDisposable>)Tick); } private IDisposable Tick(IScheduler self, State state) { lock (_gate) { if (state.isSpan) _q.Dequeue().OnCompleted(); if (state.isShift) CreateWindow(); } CreateTimer(); return Disposable.Empty; } public override void OnNext(TSource value) { lock (_gate) { foreach (ISubject<TSource> item in _q) { item.OnNext(value); } } } public override void OnError(Exception error) { lock (_gate) { foreach (ISubject<TSource> item in _q) { item.OnError(error); } ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { foreach (ISubject<TSource> item in _q) { item.OnCompleted(); } ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly TimeSpan _timeSpan; private readonly TimeSpan _timeShift; private readonly IScheduler _scheduler; public TimeSliding(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _timeShift = timeShift; _scheduler = scheduler; } protected override _ CreateSink(IObserver<IObservable<TSource>> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(this); } } internal sealed class TimeHopping : Producer<IObservable<TSource>, TimeHopping._> { internal sealed class _ : Sink<TSource, IObservable<TSource>> { private readonly object _gate = new object(); private Subject<TSource> _subject; private RefCountDisposable _refCountDisposable; public _(IObserver<IObservable<TSource>> observer) : base(observer) { } public void Run(TimeHopping parent) { CompositeDisposable compositeDisposable = new CompositeDisposable(2); _refCountDisposable = new RefCountDisposable(compositeDisposable); CreateWindow(); compositeDisposable.Add(parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick)); compositeDisposable.Add(ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this)); SetUpstream(_refCountDisposable); } private void Tick() { lock (_gate) { _subject.OnCompleted(); CreateWindow(); } } private void CreateWindow() { _subject = new Subject<TSource>(); ForwardOnNext(new WindowObservable<TSource>(_subject, _refCountDisposable)); } public override void OnNext(TSource value) { lock (_gate) { _subject.OnNext(value); } } public override void OnError(Exception error) { lock (_gate) { _subject.OnError(error); ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { _subject.OnCompleted(); ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly TimeSpan _timeSpan; private readonly IScheduler _scheduler; public TimeHopping(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _scheduler = scheduler; } protected override _ CreateSink(IObserver<IObservable<TSource>> observer) { return new _(observer); } protected override void Run(_ sink) { sink.Run(this); } } internal sealed class Ferry : Producer<IObservable<TSource>, Ferry._> { internal sealed class _ : Sink<TSource, IObservable<TSource>> { private readonly object _gate = new object(); private readonly SerialDisposable _timerD = new SerialDisposable(); private readonly int _count; private readonly TimeSpan _timeSpan; private readonly IScheduler _scheduler; private Subject<TSource> _s; private int _n; private RefCountDisposable _refCountDisposable; public _(Ferry parent, IObserver<IObservable<TSource>> observer) : base(observer) { _count = parent._count; _timeSpan = parent._timeSpan; _scheduler = parent._scheduler; } public override void Run(IObservable<TSource> source) { _n = 0; CompositeDisposable compositeDisposable = new CompositeDisposable(2) { _timerD }; _refCountDisposable = new RefCountDisposable(compositeDisposable); _s = new Subject<TSource>(); ForwardOnNext(new WindowObservable<TSource>(_s, _refCountDisposable)); CreateTimer(_s); compositeDisposable.Add(ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this)); SetUpstream(_refCountDisposable); } private void CreateTimer(Subject<TSource> window) { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); _timerD.Disposable = singleAssignmentDisposable; singleAssignmentDisposable.Disposable = _scheduler.Schedule<Subject<TSource>>(window, _timeSpan, (Func<IScheduler, Subject<TSource>, IDisposable>)Tick); } private IDisposable Tick(IScheduler self, Subject<TSource> window) { IDisposable empty = Disposable.Empty; Subject<TSource> subject = null; lock (_gate) { if (window != _s) return empty; _n = 0; subject = new Subject<TSource>(); _s.OnCompleted(); _s = subject; ForwardOnNext(new WindowObservable<TSource>(_s, _refCountDisposable)); } CreateTimer(subject); return empty; } public override void OnNext(TSource value) { Subject<TSource> subject = null; lock (_gate) { _s.OnNext(value); _n++; if (_n == _count) { _n = 0; subject = new Subject<TSource>(); _s.OnCompleted(); _s = subject; ForwardOnNext(new WindowObservable<TSource>(_s, _refCountDisposable)); } } if (subject != null) CreateTimer(subject); } public override void OnError(Exception error) { lock (_gate) { _s.OnError(error); ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { _s.OnCompleted(); ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly int _count; private readonly TimeSpan _timeSpan; private readonly IScheduler _scheduler; public Ferry(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _count = count; _scheduler = scheduler; } protected override _ CreateSink(IObserver<IObservable<TSource>> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } } }