Window<TSource>
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class Window<TSource>
{
internal sealed class Count : Producer<IObservable<TSource>, Count._>
{
internal sealed class _ : Sink<TSource, IObservable<TSource>>
{
private readonly Queue<ISubject<TSource>> _queue = new Queue<ISubject<TSource>>();
private readonly SingleAssignmentDisposable _m = new SingleAssignmentDisposable();
private readonly RefCountDisposable _refCountDisposable;
private readonly int _count;
private readonly int _skip;
private int _n;
public _(Count parent, IObserver<IObservable<TSource>> observer)
: base(observer)
{
_refCountDisposable = new RefCountDisposable(_m);
_count = parent._count;
_skip = parent._skip;
}
public override void Run(IObservable<TSource> source)
{
_n = 0;
IObservable<TSource> value = CreateWindow();
ForwardOnNext(value);
_m.Disposable = ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this);
SetUpstream(_refCountDisposable);
}
private IObservable<TSource> CreateWindow()
{
Subject<TSource> subject = new Subject<TSource>();
_queue.Enqueue(subject);
return new WindowObservable<TSource>(subject, _refCountDisposable);
}
public override void OnNext(TSource value)
{
foreach (ISubject<TSource> item in _queue) {
item.OnNext(value);
}
int num = _n - _count + 1;
if (num >= 0 && num % _skip == 0)
_queue.Dequeue().OnCompleted();
_n++;
if (_n % _skip == 0) {
IObservable<TSource> value2 = CreateWindow();
ForwardOnNext(value2);
}
}
public override void OnError(Exception error)
{
while (_queue.Count > 0) {
_queue.Dequeue().OnError(error);
}
ForwardOnError(error);
}
public override void OnCompleted()
{
while (_queue.Count > 0) {
_queue.Dequeue().OnCompleted();
}
ForwardOnCompleted();
}
}
private readonly IObservable<TSource> _source;
private readonly int _count;
private readonly int _skip;
public Count(IObservable<TSource> source, int count, int skip)
{
_source = source;
_count = count;
_skip = skip;
}
protected override _ CreateSink(IObserver<IObservable<TSource>> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
internal sealed class TimeSliding : Producer<IObservable<TSource>, TimeSliding._>
{
internal sealed class _ : Sink<TSource, IObservable<TSource>>
{
private struct State
{
public bool isSpan;
public bool isShift;
}
private readonly object _gate = new object();
private readonly Queue<ISubject<TSource>> _q = new Queue<ISubject<TSource>>();
private readonly SerialDisposable _timerD = new SerialDisposable();
private readonly IScheduler _scheduler;
private readonly TimeSpan _timeShift;
private RefCountDisposable _refCountDisposable;
private TimeSpan _totalTime;
private TimeSpan _nextShift;
private TimeSpan _nextSpan;
public _(TimeSliding parent, IObserver<IObservable<TSource>> observer)
: base(observer)
{
_scheduler = parent._scheduler;
_timeShift = parent._timeShift;
}
public void Run(TimeSliding parent)
{
_totalTime = TimeSpan.Zero;
_nextShift = parent._timeShift;
_nextSpan = parent._timeSpan;
CompositeDisposable compositeDisposable = new CompositeDisposable(2) {
_timerD
};
_refCountDisposable = new RefCountDisposable(compositeDisposable);
CreateWindow();
CreateTimer();
compositeDisposable.Add(ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this));
SetUpstream(_refCountDisposable);
}
private void CreateWindow()
{
Subject<TSource> subject = new Subject<TSource>();
_q.Enqueue(subject);
ForwardOnNext(new WindowObservable<TSource>(subject, _refCountDisposable));
}
private void CreateTimer()
{
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
_timerD.Disposable = singleAssignmentDisposable;
bool flag = false;
bool flag2 = false;
if (_nextSpan == _nextShift) {
flag = true;
flag2 = true;
} else if (_nextSpan < _nextShift) {
flag = true;
} else {
flag2 = true;
}
TimeSpan timeSpan = flag ? _nextSpan : _nextShift;
TimeSpan dueTime = timeSpan - _totalTime;
_totalTime = timeSpan;
if (flag)
_nextSpan += _timeShift;
if (flag2)
_nextShift += _timeShift;
singleAssignmentDisposable.Disposable = _scheduler.Schedule<State>(new State {
isSpan = flag,
isShift = flag2
}, dueTime, (Func<IScheduler, State, IDisposable>)Tick);
}
private IDisposable Tick(IScheduler self, State state)
{
lock (_gate) {
if (state.isSpan)
_q.Dequeue().OnCompleted();
if (state.isShift)
CreateWindow();
}
CreateTimer();
return Disposable.Empty;
}
public override void OnNext(TSource value)
{
lock (_gate) {
foreach (ISubject<TSource> item in _q) {
item.OnNext(value);
}
}
}
public override void OnError(Exception error)
{
lock (_gate) {
foreach (ISubject<TSource> item in _q) {
item.OnError(error);
}
ForwardOnError(error);
}
}
public override void OnCompleted()
{
lock (_gate) {
foreach (ISubject<TSource> item in _q) {
item.OnCompleted();
}
ForwardOnCompleted();
}
}
}
private readonly IObservable<TSource> _source;
private readonly TimeSpan _timeSpan;
private readonly TimeSpan _timeShift;
private readonly IScheduler _scheduler;
public TimeSliding(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
{
_source = source;
_timeSpan = timeSpan;
_timeShift = timeShift;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<IObservable<TSource>> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(this);
}
}
internal sealed class TimeHopping : Producer<IObservable<TSource>, TimeHopping._>
{
internal sealed class _ : Sink<TSource, IObservable<TSource>>
{
private readonly object _gate = new object();
private Subject<TSource> _subject;
private RefCountDisposable _refCountDisposable;
public _(IObserver<IObservable<TSource>> observer)
: base(observer)
{
}
public void Run(TimeHopping parent)
{
CompositeDisposable compositeDisposable = new CompositeDisposable(2);
_refCountDisposable = new RefCountDisposable(compositeDisposable);
CreateWindow();
compositeDisposable.Add(parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick));
compositeDisposable.Add(ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this));
SetUpstream(_refCountDisposable);
}
private void Tick()
{
lock (_gate) {
_subject.OnCompleted();
CreateWindow();
}
}
private void CreateWindow()
{
_subject = new Subject<TSource>();
ForwardOnNext(new WindowObservable<TSource>(_subject, _refCountDisposable));
}
public override void OnNext(TSource value)
{
lock (_gate) {
_subject.OnNext(value);
}
}
public override void OnError(Exception error)
{
lock (_gate) {
_subject.OnError(error);
ForwardOnError(error);
}
}
public override void OnCompleted()
{
lock (_gate) {
_subject.OnCompleted();
ForwardOnCompleted();
}
}
}
private readonly IObservable<TSource> _source;
private readonly TimeSpan _timeSpan;
private readonly IScheduler _scheduler;
public TimeHopping(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)
{
_source = source;
_timeSpan = timeSpan;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<IObservable<TSource>> observer)
{
return new _(observer);
}
protected override void Run(_ sink)
{
sink.Run(this);
}
}
internal sealed class Ferry : Producer<IObservable<TSource>, Ferry._>
{
internal sealed class _ : Sink<TSource, IObservable<TSource>>
{
private readonly object _gate = new object();
private readonly SerialDisposable _timerD = new SerialDisposable();
private readonly int _count;
private readonly TimeSpan _timeSpan;
private readonly IScheduler _scheduler;
private Subject<TSource> _s;
private int _n;
private RefCountDisposable _refCountDisposable;
public _(Ferry parent, IObserver<IObservable<TSource>> observer)
: base(observer)
{
_count = parent._count;
_timeSpan = parent._timeSpan;
_scheduler = parent._scheduler;
}
public override void Run(IObservable<TSource> source)
{
_n = 0;
CompositeDisposable compositeDisposable = new CompositeDisposable(2) {
_timerD
};
_refCountDisposable = new RefCountDisposable(compositeDisposable);
_s = new Subject<TSource>();
ForwardOnNext(new WindowObservable<TSource>(_s, _refCountDisposable));
CreateTimer(_s);
compositeDisposable.Add(ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this));
SetUpstream(_refCountDisposable);
}
private void CreateTimer(Subject<TSource> window)
{
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
_timerD.Disposable = singleAssignmentDisposable;
singleAssignmentDisposable.Disposable = _scheduler.Schedule<Subject<TSource>>(window, _timeSpan, (Func<IScheduler, Subject<TSource>, IDisposable>)Tick);
}
private IDisposable Tick(IScheduler self, Subject<TSource> window)
{
IDisposable empty = Disposable.Empty;
Subject<TSource> subject = null;
lock (_gate) {
if (window != _s)
return empty;
_n = 0;
subject = new Subject<TSource>();
_s.OnCompleted();
_s = subject;
ForwardOnNext(new WindowObservable<TSource>(_s, _refCountDisposable));
}
CreateTimer(subject);
return empty;
}
public override void OnNext(TSource value)
{
Subject<TSource> subject = null;
lock (_gate) {
_s.OnNext(value);
_n++;
if (_n == _count) {
_n = 0;
subject = new Subject<TSource>();
_s.OnCompleted();
_s = subject;
ForwardOnNext(new WindowObservable<TSource>(_s, _refCountDisposable));
}
}
if (subject != null)
CreateTimer(subject);
}
public override void OnError(Exception error)
{
lock (_gate) {
_s.OnError(error);
ForwardOnError(error);
}
}
public override void OnCompleted()
{
lock (_gate) {
_s.OnCompleted();
ForwardOnCompleted();
}
}
}
private readonly IObservable<TSource> _source;
private readonly int _count;
private readonly TimeSpan _timeSpan;
private readonly IScheduler _scheduler;
public Ferry(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)
{
_source = source;
_timeSpan = timeSpan;
_count = count;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<IObservable<TSource>> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
}
}