<PackageReference Include="System.Reactive" Version="6.0.0-preview.9" />

Zip<TSource>

sealed class Zip<TSource> : Producer<IList<TSource>, _<TSource>>
using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 1, 1 })] internal sealed class Zip<[System.Runtime.CompilerServices.Nullable(2)] TSource> : Producer<IList<TSource>, Zip<TSource>._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1 })] internal sealed class _ : IdentitySink<IList<TSource>> { [System.Runtime.CompilerServices.Nullable(0)] private sealed class SourceObserver : IObserver<TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] private readonly _ _parent; private readonly int _index; public SourceObserver([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ parent, int index) { _parent = parent; _index = index; } public void OnNext(TSource value) { _parent.OnNext(_index, value); } public void OnError(Exception error) { _parent.OnError(error); } public void OnCompleted() { _parent.OnCompleted(_index); } } private readonly object _gate; private Queue<TSource>[] _queues; private bool[] _isDone; [System.Runtime.CompilerServices.Nullable(2)] private SingleAssignmentDisposableValue[] _subscriptions; public _(IObserver<IList<TSource>> observer) : base(observer) { _gate = new object(); _queues = null; _isDone = null; } public void Run(IEnumerable<IObservable<TSource>> sources) { IObservable<TSource>[] array = Enumerable.ToArray<IObservable<TSource>>(sources); int num = array.Length; _queues = new Queue<TSource>[num]; for (int i = 0; i < num; i++) { _queues[i] = new Queue<TSource>(); } _isDone = new bool[num]; SingleAssignmentDisposableValue[] array2 = new SingleAssignmentDisposableValue[num]; if (Interlocked.CompareExchange<SingleAssignmentDisposableValue[]>(ref _subscriptions, array2, (SingleAssignmentDisposableValue[])null) == null) { for (int j = 0; j < num; j++) { SourceObserver observer = new SourceObserver(this, j); array2[j].Disposable = ObservableExtensions.SubscribeSafe<TSource>(array[j], (IObserver<TSource>)observer); } } } protected override void Dispose(bool disposing) { if (disposing) { SingleAssignmentDisposableValue[] array = Interlocked.Exchange<SingleAssignmentDisposableValue[]>(ref _subscriptions, Array.Empty<SingleAssignmentDisposableValue>()); if (array != null && array != Array.Empty<SingleAssignmentDisposableValue>()) { for (int i = 0; i < array.Length; i++) { array[i].Dispose(); } lock (_gate) { Queue<TSource>[] queues = _queues; for (int j = 0; j < queues.Length; j++) { queues[j].Clear(); } } } } base.Dispose(disposing); } private void OnNext(int index, TSource value) { lock (_gate) { _queues[index].Enqueue(value); if (Enumerable.All<Queue<TSource>>((IEnumerable<Queue<TSource>>)_queues, (Func<Queue<TSource>, bool>)((Queue<TSource> q) => q.Count > 0))) { int num = _queues.Length; List<TSource> list = new List<TSource>(num); for (int i = 0; i < num; i++) { list.Add(_queues[i].Dequeue()); } ForwardOnNext(list); } else if (_isDone.AllExcept(index)) { ForwardOnCompleted(); } } } private new void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } private void OnCompleted(int index) { lock (_gate) { _isDone[index] = true; if (_isDone.All()) ForwardOnCompleted(); else { SingleAssignmentDisposableValue[] array = Volatile.Read<SingleAssignmentDisposableValue[]>(ref _subscriptions); if (array != null && array != Array.Empty<SingleAssignmentDisposableValue>()) array[index].Dispose(); } } } } private readonly IEnumerable<IObservable<TSource>> _sources; public Zip(IEnumerable<IObservable<TSource>> sources) { _sources = sources; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<IList<TSource>> observer) { return new _(observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(_sources); } } }