<PackageReference Include="System.Reactive" Version="4.1.5" />

CombineLatest<TSource, TResult>

sealed class CombineLatest<TSource, TResult> : Producer<TResult, _<TSource, TResult>>
using System.Collections.Generic; using System.Collections.ObjectModel; using System.Linq; using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal sealed class CombineLatest<TSource, TResult> : Producer<TResult, CombineLatest<TSource, TResult>._> { internal sealed class _ : IdentitySink<TResult> { private sealed class SourceObserver : SafeObserver<TSource> { private readonly _ _parent; private readonly int _index; public SourceObserver(_ parent, int index) { _parent = parent; _index = index; } public override void OnNext(TSource value) { _parent.OnNext(_index, value); } public override void OnError(Exception error) { _parent.OnError(error); } public override void OnCompleted() { _parent.OnCompleted(_index); } } private readonly Func<IList<TSource>, TResult> _resultSelector; private object _gate; private bool[] _hasValue; private bool _hasValueAll; private List<TSource> _values; private bool[] _isDone; private IDisposable[] _subscriptions; public _(Func<IList<TSource>, TResult> resultSelector, IObserver<TResult> observer) : base(observer) { _resultSelector = resultSelector; } public void Run(IEnumerable<IObservable<TSource>> sources) { IObservable<TSource>[] array = Enumerable.ToArray<IObservable<TSource>>(sources); int num = array.Length; _hasValue = new bool[num]; _hasValueAll = false; _values = new List<TSource>(num); for (int i = 0; i < num; i++) { _values.Add(default(TSource)); } _isDone = new bool[num]; _subscriptions = new IDisposable[num]; _gate = new object(); for (int j = 0; j < num; j++) { int num2 = j; SourceObserver sourceObserver = new SourceObserver(this, num2); _subscriptions[num2] = sourceObserver; sourceObserver.SetResource(ObservableExtensions.SubscribeSafe<TSource>(array[num2], (IObserver<TSource>)sourceObserver)); } SetUpstream(StableCompositeDisposable.CreateTrusted(_subscriptions)); } private void OnNext(int index, TSource value) { lock (_gate) { _values[index] = value; _hasValue[index] = true; if (_hasValueAll || (_hasValueAll = _hasValue.All())) { TResult val = default(TResult); try { val = _resultSelector(new ReadOnlyCollection<TSource>(_values)); } catch (Exception error) { ForwardOnError(error); return; } ForwardOnNext(val); } else if (_isDone.AllExcept(index)) { ForwardOnCompleted(); } } } private new void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } private void OnCompleted(int index) { lock (_gate) { _isDone[index] = true; if (_isDone.All()) ForwardOnCompleted(); else _subscriptions[index].Dispose(); } } } private readonly IEnumerable<IObservable<TSource>> _sources; private readonly Func<IList<TSource>, TResult> _resultSelector; public CombineLatest(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector) { _sources = sources; _resultSelector = resultSelector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(_resultSelector, observer); } protected override void Run(_ sink) { sink.Run(_sources); } } }