Zip<TFirst, TSecond, TResult>
using System.Collections.Generic;
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class Zip<TFirst, TSecond, TResult>
{
internal sealed class Observable : Producer<TResult, Observable._>
{
internal sealed class _ : Sink<TResult>
{
private sealed class FirstObserver : IObserver<TFirst>, IDisposable
{
private readonly _ _parent;
private readonly IDisposable _self;
private SecondObserver _other;
private Queue<TFirst> _queue;
public SecondObserver Other {
set {
_other = value;
}
}
public Queue<TFirst> Queue => _queue;
public bool Done { get; set; }
public FirstObserver(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
_queue = new Queue<TFirst>();
}
public void OnNext(TFirst value)
{
lock (_parent._gate) {
if (_other.Queue.Count > 0) {
TSecond arg = _other.Queue.Dequeue();
TResult val = default(TResult);
try {
val = _parent._resultSelector(value, arg);
} catch (Exception error) {
_parent._observer.OnError(error);
_parent.Dispose();
return;
}
_parent._observer.OnNext(val);
} else if (_other.Done) {
_parent._observer.OnCompleted();
_parent.Dispose();
} else {
_queue.Enqueue(value);
}
}
}
public void OnError(Exception error)
{
lock (_parent._gate) {
_parent._observer.OnError(error);
_parent.Dispose();
}
}
public void OnCompleted()
{
lock (_parent._gate) {
Done = true;
if (_other.Done) {
_parent._observer.OnCompleted();
_parent.Dispose();
} else
_self.Dispose();
}
}
public void Dispose()
{
_queue.Clear();
}
}
private sealed class SecondObserver : IObserver<TSecond>, IDisposable
{
private readonly _ _parent;
private readonly IDisposable _self;
private FirstObserver _other;
private Queue<TSecond> _queue;
public FirstObserver Other {
set {
_other = value;
}
}
public Queue<TSecond> Queue => _queue;
public bool Done { get; set; }
public SecondObserver(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
_queue = new Queue<TSecond>();
}
public void OnNext(TSecond value)
{
lock (_parent._gate) {
if (_other.Queue.Count > 0) {
TFirst arg = _other.Queue.Dequeue();
TResult val = default(TResult);
try {
val = _parent._resultSelector(arg, value);
} catch (Exception error) {
_parent._observer.OnError(error);
_parent.Dispose();
return;
}
_parent._observer.OnNext(val);
} else if (_other.Done) {
_parent._observer.OnCompleted();
_parent.Dispose();
} else {
_queue.Enqueue(value);
}
}
}
public void OnError(Exception error)
{
lock (_parent._gate) {
_parent._observer.OnError(error);
_parent.Dispose();
}
}
public void OnCompleted()
{
lock (_parent._gate) {
Done = true;
if (_other.Done) {
_parent._observer.OnCompleted();
_parent.Dispose();
} else
_self.Dispose();
}
}
public void Dispose()
{
_queue.Clear();
}
}
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
private object _gate;
public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_resultSelector = resultSelector;
}
public IDisposable Run(IObservable<TFirst> first, IObservable<TSecond> second)
{
_gate = new object();
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
SingleAssignmentDisposable singleAssignmentDisposable2 = new SingleAssignmentDisposable();
FirstObserver firstObserver = new FirstObserver(this, singleAssignmentDisposable);
SecondObserver secondObserver2 = firstObserver.Other = new SecondObserver(this, singleAssignmentDisposable2);
secondObserver2.Other = firstObserver;
singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TFirst>(first, (IObserver<TFirst>)firstObserver);
singleAssignmentDisposable2.Disposable = ObservableExtensions.SubscribeSafe<TSecond>(second, (IObserver<TSecond>)secondObserver2);
return StableCompositeDisposable.Create(singleAssignmentDisposable, singleAssignmentDisposable2, firstObserver, secondObserver2);
}
}
private readonly IObservable<TFirst> _first;
private readonly IObservable<TSecond> _second;
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
public Observable(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
_first = first;
_second = second;
_resultSelector = resultSelector;
}
protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(_resultSelector, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(_first, _second);
}
}
internal sealed class Enumerable : Producer<TResult, Enumerable._>
{
internal sealed class _ : Sink<TResult>, IObserver<TFirst>
{
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
private IEnumerator<TSecond> _rightEnumerator;
public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_resultSelector = resultSelector;
}
public IDisposable Run(IObservable<TFirst> first, IEnumerable<TSecond> second)
{
try {
_rightEnumerator = second.GetEnumerator();
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return Disposable.Empty;
}
return StableCompositeDisposable.Create(ObservableExtensions.SubscribeSafe<TFirst>(first, (IObserver<TFirst>)this), _rightEnumerator);
}
public void OnNext(TFirst value)
{
bool flag = false;
try {
flag = _rightEnumerator.MoveNext();
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return;
}
if (flag) {
TSecond val = default(TSecond);
try {
val = _rightEnumerator.Current;
} catch (Exception error2) {
_observer.OnError(error2);
base.Dispose();
return;
}
TResult value2;
try {
value2 = _resultSelector(value, val);
} catch (Exception error3) {
_observer.OnError(error3);
base.Dispose();
return;
}
_observer.OnNext(value2);
} else {
_observer.OnCompleted();
base.Dispose();
}
}
public void OnError(Exception error)
{
_observer.OnError(error);
base.Dispose();
}
public void OnCompleted()
{
_observer.OnCompleted();
base.Dispose();
}
}
private readonly IObservable<TFirst> _first;
private readonly IEnumerable<TSecond> _second;
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
public Enumerable(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
_first = first;
_second = second;
_resultSelector = resultSelector;
}
protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(_resultSelector, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(_first, _second);
}
}
}
}