<PackageReference Include="System.Reactive" Version="6.0.0-preview.13" />

CombineLatest<TSource, TResult>

sealed class CombineLatest<TSource, TResult> : Producer<TResult, _<TSource, TResult>>
using System.Collections.Generic; using System.Collections.ObjectModel; using System.Linq; using System.Reactive.Disposables; using System.Runtime.CompilerServices; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 1, 1 })] internal sealed class CombineLatest<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TResult> : Producer<TResult, CombineLatest<TSource, TResult>._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TResult> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] private sealed class SourceObserver : SafeObserver<TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] private readonly _ _parent; private readonly int _index; public SourceObserver([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ parent, int index) { _parent = parent; _index = index; } public override void OnNext(TSource value) { _parent.OnNext(_index, value); } public override void OnError(Exception error) { _parent.OnError(error); } public override void OnCompleted() { _parent.OnCompleted(_index); } } private readonly object _gate = new object(); private readonly Func<IList<TSource>, TResult> _resultSelector; private bool[] _hasValue; private bool _hasValueAll; private TSource[] _values; private bool[] _isDone; private IDisposable[] _subscriptions; public _(Func<IList<TSource>, TResult> resultSelector, IObserver<TResult> observer) : base(observer) { _resultSelector = resultSelector; _hasValue = null; _values = null; _isDone = null; _subscriptions = null; } public void Run(IEnumerable<IObservable<TSource>> sources) { IObservable<TSource>[] array = Enumerable.ToArray<IObservable<TSource>>(sources); int num = array.Length; _hasValue = new bool[num]; _hasValueAll = false; _values = new TSource[num]; _isDone = new bool[num]; _subscriptions = new IDisposable[num]; for (int i = 0; i < num; i++) { int num2 = i; SourceObserver sourceObserver = new SourceObserver(this, num2); _subscriptions[num2] = sourceObserver; sourceObserver.SetResource(ObservableExtensions.SubscribeSafe<TSource>(array[num2], (IObserver<TSource>)sourceObserver)); } SetUpstream(StableCompositeDisposable.CreateTrusted(_subscriptions)); } private void OnNext(int index, TSource value) { lock (_gate) { _values[index] = value; _hasValue[index] = true; if (_hasValueAll || (_hasValueAll = _hasValue.All())) { TResult value2; try { value2 = _resultSelector(new ReadOnlyCollection<TSource>(_values)); } catch (Exception error) { ForwardOnError(error); return; } ForwardOnNext(value2); } else if (_isDone.AllExcept(index)) { ForwardOnCompleted(); } } } private new void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } private void OnCompleted(int index) { lock (_gate) { _isDone[index] = true; if (_isDone.All()) ForwardOnCompleted(); else _subscriptions[index].Dispose(); } } } private readonly IEnumerable<IObservable<TSource>> _sources; private readonly Func<IList<TSource>, TResult> _resultSelector; public CombineLatest(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector) { _sources = sources; _resultSelector = resultSelector; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] protected override _ CreateSink(IObserver<TResult> observer) { return new _(_resultSelector, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ sink) { sink.Run(_sources); } } }