Zip<TFirst, TSecond, TResult>
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Linq.ObservableImpl
{
[System.Runtime.CompilerServices.NullableContext(2)]
[System.Runtime.CompilerServices.Nullable(0)]
internal static class Zip<TFirst, TSecond, TResult>
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1,
1,
0,
0,
0
})]
internal sealed class Observable : Producer<TResult, Observable._>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
internal sealed class _ : IdentitySink<TResult>
{
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class FirstObserver : IObserver<TFirst>, IDisposable
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})]
private readonly _ _parent;
private readonly Queue<TFirst> _queue;
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})]
private SecondObserver _other;
public Queue<TFirst> Queue => _queue;
public bool Done { get; set; }
public FirstObserver([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})] _ parent)
{
_parent = parent;
_queue = new Queue<TFirst>();
_other = null;
}
public void SetOther([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})] SecondObserver other)
{
_other = other;
}
public void OnNext(TFirst value)
{
lock (_parent._gate) {
if (_other.Queue.Count > 0) {
TSecond arg = _other.Queue.Dequeue();
TResult value2;
try {
value2 = _parent._resultSelector(value, arg);
} catch (Exception error) {
_parent.ForwardOnError(error);
return;
}
_parent.ForwardOnNext(value2);
} else if (_other.Done) {
_parent.ForwardOnCompleted();
} else {
_queue.Enqueue(value);
}
}
}
public void OnError(Exception error)
{
lock (_parent._gate) {
_parent.ForwardOnError(error);
}
}
public void OnCompleted()
{
lock (_parent._gate) {
Done = true;
if (_other.Done)
_parent.ForwardOnCompleted();
else
_parent._firstDisposable.Dispose();
}
}
public void Dispose()
{
_queue.Clear();
}
}
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class SecondObserver : IObserver<TSecond>, IDisposable
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})]
private readonly _ _parent;
private readonly Queue<TSecond> _queue;
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})]
private FirstObserver _other;
public Queue<TSecond> Queue => _queue;
public bool Done { get; set; }
public SecondObserver([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})] _ parent)
{
_parent = parent;
_queue = new Queue<TSecond>();
_other = null;
}
public void SetOther([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})] FirstObserver other)
{
_other = other;
}
public void OnNext(TSecond value)
{
lock (_parent._gate) {
if (_other.Queue.Count > 0) {
TFirst arg = _other.Queue.Dequeue();
TResult value2;
try {
value2 = _parent._resultSelector(arg, value);
} catch (Exception error) {
_parent.ForwardOnError(error);
return;
}
_parent.ForwardOnNext(value2);
} else if (_other.Done) {
_parent.ForwardOnCompleted();
} else {
_queue.Enqueue(value);
}
}
}
public void OnError(Exception error)
{
lock (_parent._gate) {
_parent.ForwardOnError(error);
}
}
public void OnCompleted()
{
lock (_parent._gate) {
Done = true;
if (_other.Done)
_parent.ForwardOnCompleted();
else
_parent._secondDisposable.Dispose();
}
}
public void Dispose()
{
_queue.Clear();
}
}
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
private readonly object _gate;
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})]
private readonly FirstObserver _firstObserver;
private SingleAssignmentDisposableValue _firstDisposable;
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})]
private readonly SecondObserver _secondObserver;
private SingleAssignmentDisposableValue _secondDisposable;
public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
: base(observer)
{
_gate = new object();
_firstObserver = new FirstObserver(this);
_secondObserver = new SecondObserver(this);
_firstObserver.SetOther(_secondObserver);
_secondObserver.SetOther(_firstObserver);
_resultSelector = resultSelector;
}
public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
{
_firstDisposable.Disposable = ObservableExtensions.SubscribeSafe<TFirst>(first, (IObserver<TFirst>)_firstObserver);
_secondDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSecond>(second, (IObserver<TSecond>)_secondObserver);
}
protected override void Dispose(bool disposing)
{
if (disposing) {
_firstDisposable.Dispose();
_secondDisposable.Dispose();
lock (_gate) {
_firstObserver.Dispose();
_secondObserver.Dispose();
}
}
base.Dispose(disposing);
}
}
private readonly IObservable<TFirst> _first;
private readonly IObservable<TSecond> _second;
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
public Observable(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
_first = first;
_second = second;
_resultSelector = resultSelector;
}
[return: System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})]
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(_resultSelector, observer);
}
protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})] _ sink)
{
sink.Run(_first, _second);
}
}
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1,
1,
0,
0,
0
})]
internal sealed class Enumerable : Producer<TResult, Enumerable._>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1,
1
})]
internal sealed class _ : Sink<TFirst, TResult>
{
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
private int _enumerationInProgress;
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
1
})]
private IEnumerator<TSecond> _rightEnumerator;
private static readonly IEnumerator<TSecond> DisposedEnumerator = MakeDisposedEnumerator();
public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
: base(observer)
{
_resultSelector = resultSelector;
}
private static IEnumerator<TSecond> MakeDisposedEnumerator()
{
yield break;
}
public void Run(IObservable<TFirst> first, IEnumerable<TSecond> second)
{
try {
IEnumerator<TSecond> enumerator = second.GetEnumerator();
if (Interlocked.CompareExchange<IEnumerator<TSecond>>(ref _rightEnumerator, enumerator, (IEnumerator<TSecond>)null) != null) {
enumerator.Dispose();
return;
}
} catch (Exception error) {
ForwardOnError(error);
return;
}
Run(first);
}
protected override void Dispose(bool disposing)
{
if (disposing && Interlocked.Increment(ref _enumerationInProgress) == 1)
Interlocked.Exchange<IEnumerator<TSecond>>(ref _rightEnumerator, DisposedEnumerator)?.Dispose();
base.Dispose(disposing);
}
public override void OnNext(TFirst value)
{
IEnumerator<TSecond> enumerator = Volatile.Read<IEnumerator<TSecond>>(ref _rightEnumerator);
if (enumerator != DisposedEnumerator && Interlocked.Increment(ref _enumerationInProgress) == 1) {
TSecond arg = default(TSecond);
bool flag = false;
bool flag2 = default(bool);
try {
try {
flag2 = enumerator.MoveNext();
if (flag2)
arg = enumerator.Current;
} finally {
if (Interlocked.Decrement(ref _enumerationInProgress) != 0) {
Interlocked.Exchange<IEnumerator<TSecond>>(ref _rightEnumerator, DisposedEnumerator)?.Dispose();
flag = true;
}
}
} catch (Exception error) {
ForwardOnError(error);
return;
}
if (!flag) {
if (flag2) {
TResult value2;
try {
value2 = _resultSelector(value, arg);
} catch (Exception error2) {
ForwardOnError(error2);
return;
}
ForwardOnNext(value2);
} else
ForwardOnCompleted();
}
}
}
}
private readonly IObservable<TFirst> _first;
private readonly IEnumerable<TSecond> _second;
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
public Enumerable(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
_first = first;
_second = second;
_resultSelector = resultSelector;
}
[return: System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})]
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(_resultSelector, observer);
}
protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})] _ sink)
{
sink.Run(_first, _second);
}
}
}
}