CombineLatest<TFirst, TSecond, TResult>
sealed class CombineLatest<TFirst, TSecond, TResult> : Producer<TResult, _<TFirst, TSecond, TResult>>
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class CombineLatest<TFirst, TSecond, TResult> : Producer<TResult, CombineLatest<TFirst, TSecond, TResult>._>
{
internal sealed class _ : IdentitySink<TResult>
{
private sealed class FirstObserver : IObserver<TFirst>
{
private readonly _ _parent;
private SecondObserver _other;
public SecondObserver Other {
set {
_other = value;
}
}
public bool HasValue { get; set; }
public TFirst Value { get; set; }
public bool Done { get; set; }
public FirstObserver(_ parent)
{
_parent = parent;
}
public void OnNext(TFirst value)
{
lock (_parent._gate) {
HasValue = true;
Value = value;
if (_other.HasValue) {
TResult val = default(TResult);
try {
val = _parent._resultSelector(value, _other.Value);
} catch (Exception error) {
_parent.ForwardOnError(error);
return;
}
_parent.ForwardOnNext(val);
} else if (_other.Done) {
_parent.ForwardOnCompleted();
}
}
}
public void OnError(Exception error)
{
lock (_parent._gate) {
_parent.ForwardOnError(error);
}
}
public void OnCompleted()
{
lock (_parent._gate) {
Done = true;
if (_other.Done)
_parent.ForwardOnCompleted();
else
Disposable.TryDispose(ref _parent._firstDisposable);
}
}
}
private sealed class SecondObserver : IObserver<TSecond>
{
private readonly _ _parent;
private FirstObserver _other;
public FirstObserver Other {
set {
_other = value;
}
}
public bool HasValue { get; set; }
public TSecond Value { get; set; }
public bool Done { get; set; }
public SecondObserver(_ parent)
{
_parent = parent;
}
public void OnNext(TSecond value)
{
lock (_parent._gate) {
HasValue = true;
Value = value;
if (_other.HasValue) {
TResult val = default(TResult);
try {
val = _parent._resultSelector(_other.Value, value);
} catch (Exception error) {
_parent.ForwardOnError(error);
return;
}
_parent.ForwardOnNext(val);
} else if (_other.Done) {
_parent.ForwardOnCompleted();
}
}
}
public void OnError(Exception error)
{
lock (_parent._gate) {
_parent.ForwardOnError(error);
}
}
public void OnCompleted()
{
lock (_parent._gate) {
Done = true;
if (_other.Done)
_parent.ForwardOnCompleted();
else
Disposable.TryDispose(ref _parent._secondDisposable);
}
}
}
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
private object _gate;
private IDisposable _firstDisposable;
private IDisposable _secondDisposable;
public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
: base(observer)
{
_resultSelector = resultSelector;
}
public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
{
_gate = new object();
FirstObserver firstObserver = new FirstObserver(this);
SecondObserver secondObserver2 = firstObserver.Other = new SecondObserver(this);
secondObserver2.Other = firstObserver;
Disposable.SetSingle(ref _firstDisposable, ObservableExtensions.SubscribeSafe<TFirst>(first, (IObserver<TFirst>)firstObserver));
Disposable.SetSingle(ref _secondDisposable, ObservableExtensions.SubscribeSafe<TSecond>(second, (IObserver<TSecond>)secondObserver2));
}
protected override void Dispose(bool disposing)
{
if (disposing) {
Disposable.TryDispose(ref _firstDisposable);
Disposable.TryDispose(ref _secondDisposable);
}
base.Dispose(disposing);
}
}
private readonly IObservable<TFirst> _first;
private readonly IObservable<TSecond> _second;
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
public CombineLatest(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
_first = first;
_second = second;
_resultSelector = resultSelector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(_resultSelector, observer);
}
protected override void Run(_ sink)
{
sink.Run(_first, _second);
}
}
}