WithLatestFrom<TFirst, TSecond, TResult>
sealed class WithLatestFrom<TFirst, TSecond, TResult> : Producer<TResult, _<TFirst, TSecond, TResult>>
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
namespace System.Reactive.Linq.ObservableImpl
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1,
1,
1,
1,
1
})]
internal sealed class WithLatestFrom<[System.Runtime.CompilerServices.Nullable(2)] TFirst, [System.Runtime.CompilerServices.Nullable(2)] TSecond, [System.Runtime.CompilerServices.Nullable(2)] TResult> : Producer<TResult, WithLatestFrom<TFirst, TSecond, TResult>._>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
internal sealed class _ : IdentitySink<TResult>
{
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class FirstObserver : IObserver<TFirst>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})]
private readonly _ _parent;
public FirstObserver([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})] _ parent)
{
_parent = parent;
}
public void OnCompleted()
{
lock (_parent._gate) {
_parent.ForwardOnCompleted();
}
}
public void OnError(Exception error)
{
lock (_parent._gate) {
_parent.ForwardOnError(error);
}
}
public void OnNext(TFirst value)
{
if (_parent._hasLatest) {
TSecond latest = default(TSecond);
lock (_parent._latestGate) {
latest = _parent._latest;
}
TResult value2;
try {
value2 = _parent._resultSelector(value, latest);
} catch (Exception error) {
lock (_parent._gate) {
_parent.ForwardOnError(error);
}
return;
}
lock (_parent._gate) {
_parent.ForwardOnNext(value2);
}
}
}
}
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class SecondObserver : IObserver<TSecond>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})]
private readonly _ _parent;
public SecondObserver([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})] _ parent)
{
_parent = parent;
}
public void OnCompleted()
{
_parent._secondDisposable.Dispose();
}
public void OnError(Exception error)
{
lock (_parent._gate) {
_parent.ForwardOnError(error);
}
}
public void OnNext(TSecond value)
{
lock (_parent._latestGate) {
_parent._latest = value;
}
if (!_parent._hasLatest)
_parent._hasLatest = true;
}
}
private readonly object _gate = new object();
private readonly object _latestGate = new object();
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
private volatile bool _hasLatest;
[System.Runtime.CompilerServices.Nullable(2)]
private TSecond _latest;
private SingleAssignmentDisposableValue _secondDisposable;
public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
: base(observer)
{
_resultSelector = resultSelector;
}
public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
{
FirstObserver observer = new FirstObserver(this);
SecondObserver observer2 = new SecondObserver(this);
_secondDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSecond>(second, (IObserver<TSecond>)observer2);
SetUpstream(ObservableExtensions.SubscribeSafe<TFirst>(first, (IObserver<TFirst>)observer));
}
protected override void Dispose(bool disposing)
{
if (disposing)
_secondDisposable.Dispose();
base.Dispose(disposing);
}
}
private readonly IObservable<TFirst> _first;
private readonly IObservable<TSecond> _second;
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
public WithLatestFrom(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
_first = first;
_second = second;
_resultSelector = resultSelector;
}
[return: System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})]
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(_resultSelector, observer);
}
protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0,
0
})] _ sink)
{
sink.Run(_first, _second);
}
}
}