<PackageReference Include="System.Reactive" Version="4.2.0-preview.102" />

Zip<TSource>

sealed class Zip<TSource> : Producer<IList<TSource>, _<TSource>>
using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal sealed class Zip<TSource> : Producer<IList<TSource>, Zip<TSource>._> { internal sealed class _ : IdentitySink<IList<TSource>> { private sealed class SourceObserver : IObserver<TSource> { private readonly _ _parent; private readonly int _index; public SourceObserver(_ parent, int index) { _parent = parent; _index = index; } public void OnNext(TSource value) { _parent.OnNext(_index, value); } public void OnError(Exception error) { _parent.OnError(error); } public void OnCompleted() { _parent.OnCompleted(_index); } } private readonly object _gate; private Queue<TSource>[] _queues; private bool[] _isDone; private IDisposable[] _subscriptions; public _(IObserver<IList<TSource>> observer) : base(observer) { _gate = new object(); } public void Run(IEnumerable<IObservable<TSource>> sources) { IObservable<TSource>[] array = Enumerable.ToArray<IObservable<TSource>>(sources); int num = array.Length; _queues = new Queue<TSource>[num]; for (int i = 0; i < num; i++) { _queues[i] = new Queue<TSource>(); } _isDone = new bool[num]; IDisposable[] array2 = new IDisposable[num]; if (Interlocked.CompareExchange<IDisposable[]>(ref _subscriptions, array2, (IDisposable[])null) == null) { for (int j = 0; j < num; j++) { SourceObserver observer = new SourceObserver(this, j); Disposable.SetSingle(ref array2[j], ObservableExtensions.SubscribeSafe<TSource>(array[j], (IObserver<TSource>)observer)); } } } protected override void Dispose(bool disposing) { if (disposing) { IDisposable[] array = Interlocked.Exchange<IDisposable[]>(ref _subscriptions, Array.Empty<IDisposable>()); if (array != null && array != Array.Empty<IDisposable>()) { for (int i = 0; i < array.Length; i++) { Disposable.TryDispose(ref array[i]); } lock (_gate) { Queue<TSource>[] queues = _queues; for (int j = 0; j < queues.Length; j++) { queues[j].Clear(); } } } } base.Dispose(disposing); } private void OnNext(int index, TSource value) { lock (_gate) { _queues[index].Enqueue(value); if (Enumerable.All<Queue<TSource>>((IEnumerable<Queue<TSource>>)_queues, (Func<Queue<TSource>, bool>)((Queue<TSource> q) => q.Count > 0))) { int num = _queues.Length; List<TSource> list = new List<TSource>(num); for (int i = 0; i < num; i++) { list.Add(_queues[i].Dequeue()); } ForwardOnNext(list); } else if (_isDone.AllExcept(index)) { ForwardOnCompleted(); } } } private new void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } private void OnCompleted(int index) { lock (_gate) { _isDone[index] = true; if (_isDone.All()) ForwardOnCompleted(); else { IDisposable[] array = Volatile.Read<IDisposable[]>(ref _subscriptions); if (array != null && array != Array.Empty<IDisposable>()) Disposable.TryDispose(ref array[index]); } } } } private readonly IEnumerable<IObservable<TSource>> _sources; public Zip(IEnumerable<IObservable<TSource>> sources) { _sources = sources; } protected override _ CreateSink(IObserver<IList<TSource>> observer) { return new _(observer); } protected override void Run(_ sink) { sink.Run(_sources); } } }