Zip<TSource>
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Linq.ObservableImpl
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1,
1,
1,
1
})]
internal sealed class Zip<[System.Runtime.CompilerServices.Nullable(2)] TSource> : Producer<IList<TSource>, Zip<TSource>._>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1,
1
})]
internal sealed class _ : IdentitySink<IList<TSource>>
{
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class SourceObserver : IObserver<TSource>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})]
private readonly _ _parent;
private readonly int _index;
public SourceObserver([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})] _ parent, int index)
{
_parent = parent;
_index = index;
}
public void OnNext(TSource value)
{
_parent.OnNext(_index, value);
}
public void OnError(Exception error)
{
_parent.OnError(error);
}
public void OnCompleted()
{
_parent.OnCompleted(_index);
}
}
private readonly object _gate;
private Queue<TSource>[] _queues;
private bool[] _isDone;
[System.Runtime.CompilerServices.Nullable(2)]
private SingleAssignmentDisposableValue[] _subscriptions;
public _(IObserver<IList<TSource>> observer)
: base(observer)
{
_gate = new object();
_queues = null;
_isDone = null;
}
public void Run(IEnumerable<IObservable<TSource>> sources)
{
IObservable<TSource>[] array = Enumerable.ToArray<IObservable<TSource>>(sources);
int num = array.Length;
_queues = new Queue<TSource>[num];
for (int i = 0; i < num; i++) {
_queues[i] = new Queue<TSource>();
}
_isDone = new bool[num];
SingleAssignmentDisposableValue[] array2 = new SingleAssignmentDisposableValue[num];
if (Interlocked.CompareExchange<SingleAssignmentDisposableValue[]>(ref _subscriptions, array2, (SingleAssignmentDisposableValue[])null) == null) {
for (int j = 0; j < num; j++) {
SourceObserver observer = new SourceObserver(this, j);
array2[j].Disposable = ObservableExtensions.SubscribeSafe<TSource>(array[j], (IObserver<TSource>)observer);
}
}
}
protected override void Dispose(bool disposing)
{
if (disposing) {
SingleAssignmentDisposableValue[] array = Interlocked.Exchange<SingleAssignmentDisposableValue[]>(ref _subscriptions, Array.Empty<SingleAssignmentDisposableValue>());
if (array != null && array != Array.Empty<SingleAssignmentDisposableValue>()) {
for (int i = 0; i < array.Length; i++) {
array[i].Dispose();
}
lock (_gate) {
Queue<TSource>[] queues = _queues;
for (int j = 0; j < queues.Length; j++) {
queues[j].Clear();
}
}
}
}
base.Dispose(disposing);
}
private void OnNext(int index, TSource value)
{
lock (_gate) {
_queues[index].Enqueue(value);
if (Enumerable.All<Queue<TSource>>((IEnumerable<Queue<TSource>>)_queues, (Func<Queue<TSource>, bool>)((Queue<TSource> q) => q.Count > 0))) {
int num = _queues.Length;
List<TSource> list = new List<TSource>(num);
for (int i = 0; i < num; i++) {
list.Add(_queues[i].Dequeue());
}
ForwardOnNext(list);
} else if (_isDone.AllExcept(index)) {
ForwardOnCompleted();
}
}
}
private new void OnError(Exception error)
{
lock (_gate) {
ForwardOnError(error);
}
}
private void OnCompleted(int index)
{
lock (_gate) {
_isDone[index] = true;
if (_isDone.All())
ForwardOnCompleted();
else {
SingleAssignmentDisposableValue[] array = Volatile.Read<SingleAssignmentDisposableValue[]>(ref _subscriptions);
if (array != null && array != Array.Empty<SingleAssignmentDisposableValue>())
array[index].Dispose();
}
}
}
}
private readonly IEnumerable<IObservable<TSource>> _sources;
public Zip(IEnumerable<IObservable<TSource>> sources)
{
_sources = sources;
}
[return: System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})]
protected override _ CreateSink(IObserver<IList<TSource>> observer)
{
return new _(observer);
}
protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})] _ sink)
{
sink.Run(_sources);
}
}
}