SkipUntil<TSource, TOther>
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
namespace System.Reactive.Linq.ObservableImpl
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1,
1,
1,
1
})]
internal sealed class SkipUntil<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TOther> : Producer<TSource, SkipUntil<TSource, TOther>._>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
internal sealed class _ : IdentitySink<TSource>
{
[System.Runtime.CompilerServices.NullableContext(0)]
private sealed class OtherObserver : IObserver<TOther>, IDisposable
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0
})]
private readonly _ _parent;
public OtherObserver([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0
})] _ parent)
{
_parent = parent;
}
public void Dispose()
{
if (!_parent._otherDisposable.IsDisposed)
_parent._otherDisposable.Dispose();
}
public void OnCompleted()
{
Dispose();
}
[System.Runtime.CompilerServices.NullableContext(1)]
public void OnError(Exception error)
{
HalfSerializer.ForwardOnError<TSource>((ISink<TSource>)_parent, error, ref _parent._halfSerializer, ref _parent._error);
}
[System.Runtime.CompilerServices.NullableContext(1)]
public void OnNext(TOther value)
{
_parent.OtherComplete();
Dispose();
}
}
private SingleAssignmentDisposableValue _otherDisposable;
private bool _forward;
private int _halfSerializer;
[System.Runtime.CompilerServices.Nullable(2)]
private Exception _error;
public _(IObserver<TSource> observer)
: base(observer)
{
}
public void Run(SkipUntil<TSource, TOther> parent)
{
_otherDisposable.Disposable = parent._other.Subscribe(new OtherObserver(this));
Run(parent._source);
}
protected override void Dispose(bool disposing)
{
if (disposing && !_otherDisposable.IsDisposed)
_otherDisposable.Dispose();
base.Dispose(disposing);
}
public override void OnNext(TSource value)
{
if (_forward)
HalfSerializer.ForwardOnNext<TSource>((ISink<TSource>)this, value, ref _halfSerializer, ref _error);
}
public override void OnError(Exception ex)
{
HalfSerializer.ForwardOnError<TSource>((ISink<TSource>)this, ex, ref _halfSerializer, ref _error);
}
public override void OnCompleted()
{
if (_forward)
HalfSerializer.ForwardOnCompleted<TSource>((ISink<TSource>)this, ref _halfSerializer, ref _error);
else
DisposeUpstream();
}
private void OtherComplete()
{
_forward = true;
}
}
private readonly IObservable<TSource> _source;
private readonly IObservable<TOther> _other;
public SkipUntil(IObservable<TSource> source, IObservable<TOther> other)
{
_source = source;
_other = other;
}
[return: System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0
})]
protected override _ CreateSink(IObserver<TSource> observer)
{
return new _(observer);
}
protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
0
})] _ sink)
{
sink.Run(this);
}
}
}