<PackageReference Include="Relativity.Transfer.Client" Version="7.2.7" />

Zip<TSource>

sealed class Zip<TSource> : Producer<IList<TSource>, _<TSource>>
using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal sealed class Zip<TSource> : Producer<IList<TSource>, Zip<TSource>._> { internal sealed class _ : Sink<IList<TSource>> { private sealed class SourceObserver : IObserver<TSource> { private readonly _ _parent; private readonly int _index; public SourceObserver(_ parent, int index) { _parent = parent; _index = index; } public void OnNext(TSource value) { _parent.OnNext(_index, value); } public void OnError(Exception error) { _parent.OnError(error); } public void OnCompleted() { _parent.OnCompleted(_index); } } private readonly Zip<TSource> _parent; private object _gate; private Queue<TSource>[] _queues; private bool[] _isDone; private IDisposable[] _subscriptions; public _(Zip<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } public IDisposable Run() { IObservable<TSource>[] array = Enumerable.ToArray<IObservable<TSource>>(_parent._sources); int num = array.Length; _queues = new Queue<TSource>[num]; for (int i = 0; i < num; i++) { _queues[i] = new Queue<TSource>(); } _isDone = new bool[num]; IDisposable[] array2 = _subscriptions = new SingleAssignmentDisposable[num]; _gate = new object(); for (int j = 0; j < num; j++) { int num2 = j; SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); _subscriptions[num2] = singleAssignmentDisposable; SourceObserver observer = new SourceObserver(this, num2); singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(array[num2], (IObserver<TSource>)observer); } return new CompositeDisposable(_subscriptions) { Disposable.Create(delegate { Queue<TSource>[] queues = _queues; for (int k = 0; k < queues.Length; k++) { queues[k].Clear(); } }) }; } private void OnNext(int index, TSource value) { lock (_gate) { _queues[index].Enqueue(value); if (Enumerable.All<Queue<TSource>>((IEnumerable<Queue<TSource>>)_queues, (Func<Queue<TSource>, bool>)((Queue<TSource> q) => q.Count > 0))) { int num = _queues.Length; List<TSource> list = new List<TSource>(num); for (int i = 0; i < num; i++) { list.Add(_queues[i].Dequeue()); } _observer.OnNext(list); } else if (_isDone.AllExcept(index)) { _observer.OnCompleted(); base.Dispose(); } } } private void OnError(Exception error) { lock (_gate) { _observer.OnError(error); base.Dispose(); } } private void OnCompleted(int index) { lock (_gate) { _isDone[index] = true; if (_isDone.All()) { _observer.OnCompleted(); base.Dispose(); } else _subscriptions[index].Dispose(); } } } private readonly IEnumerable<IObservable<TSource>> _sources; public Zip(IEnumerable<IObservable<TSource>> sources) { _sources = sources; } protected override _ CreateSink(IObserver<IList<TSource>> observer, IDisposable cancel) { return new _(this, observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(); } } }