CombineLatest<TSource, TResult>
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class CombineLatest<TSource, TResult> : Producer<TResult, CombineLatest<TSource, TResult>._>
{
internal sealed class _ : IdentitySink<TResult>
{
private sealed class SourceObserver : SafeObserver<TSource>
{
private readonly _ _parent;
private readonly int _index;
public SourceObserver(_ parent, int index)
{
_parent = parent;
_index = index;
}
public override void OnNext(TSource value)
{
_parent.OnNext(_index, value);
}
public override void OnError(Exception error)
{
_parent.OnError(error);
}
public override void OnCompleted()
{
_parent.OnCompleted(_index);
}
}
private readonly Func<IList<TSource>, TResult> _resultSelector;
private object _gate;
private bool[] _hasValue;
private bool _hasValueAll;
private List<TSource> _values;
private bool[] _isDone;
private IDisposable[] _subscriptions;
public _(Func<IList<TSource>, TResult> resultSelector, IObserver<TResult> observer)
: base(observer)
{
_resultSelector = resultSelector;
}
public void Run(IEnumerable<IObservable<TSource>> sources)
{
IObservable<TSource>[] array = Enumerable.ToArray<IObservable<TSource>>(sources);
int num = array.Length;
_hasValue = new bool[num];
_hasValueAll = false;
_values = new List<TSource>(num);
for (int i = 0; i < num; i++) {
_values.Add(default(TSource));
}
_isDone = new bool[num];
_subscriptions = new IDisposable[num];
_gate = new object();
for (int j = 0; j < num; j++) {
int num2 = j;
SourceObserver sourceObserver = new SourceObserver(this, num2);
_subscriptions[num2] = sourceObserver;
sourceObserver.SetResource(ObservableExtensions.SubscribeSafe<TSource>(array[num2], (IObserver<TSource>)sourceObserver));
}
SetUpstream(StableCompositeDisposable.CreateTrusted(_subscriptions));
}
private void OnNext(int index, TSource value)
{
lock (_gate) {
_values[index] = value;
_hasValue[index] = true;
if (_hasValueAll || (_hasValueAll = _hasValue.All())) {
TResult val = default(TResult);
try {
val = _resultSelector(new ReadOnlyCollection<TSource>(_values));
} catch (Exception error) {
ForwardOnError(error);
return;
}
ForwardOnNext(val);
} else if (_isDone.AllExcept(index)) {
ForwardOnCompleted();
}
}
}
private new void OnError(Exception error)
{
lock (_gate) {
ForwardOnError(error);
}
}
private void OnCompleted(int index)
{
lock (_gate) {
_isDone[index] = true;
if (_isDone.All())
ForwardOnCompleted();
else
_subscriptions[index].Dispose();
}
}
}
private readonly IEnumerable<IObservable<TSource>> _sources;
private readonly Func<IList<TSource>, TResult> _resultSelector;
public CombineLatest(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
{
_sources = sources;
_resultSelector = resultSelector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(_resultSelector, observer);
}
protected override void Run(_ sink)
{
sink.Run(_sources);
}
}
}