<PackageReference Include="System.Reactive" Version="6.0.1" />

TailRecursiveSink<TSource>

abstract class TailRecursiveSink<TSource> : IdentitySink<TSource>
using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal abstract class TailRecursiveSink<[System.Runtime.CompilerServices.Nullable(2)] TSource> : IdentitySink<TSource> { private readonly Stack<IEnumerator<IObservable<TSource>>> _stack = new Stack<IEnumerator<IObservable<TSource>>>(); private bool _isDisposed; private int _trampoline; [System.Runtime.CompilerServices.Nullable(2)] private IDisposable _currentSubscription; protected TailRecursiveSink(IObserver<TSource> observer) : base(observer) { } public void Run(IEnumerable<IObservable<TSource>> sources) { if (TryGetEnumerator(sources, out IEnumerator<IObservable<TSource>> result)) { _stack.Push(result); Drain(); } } protected override void Dispose(bool disposing) { if (disposing) DisposeAll(); base.Dispose(disposing); } private void Drain() { if (Interlocked.Increment(ref _trampoline) == 1) { do { if (Volatile.Read(ref _isDisposed)) { while (_stack.Count != 0) { _stack.Pop().Dispose(); } Disposable.Dispose(ref _currentSubscription); } else if (_stack.Count != 0) { IEnumerator<IObservable<TSource>> enumerator = _stack.Peek(); IObservable<TSource> source = null; try { if (enumerator.MoveNext()) source = enumerator.Current; } catch (Exception error) { enumerator.Dispose(); ForwardOnError(error); Volatile.Write(ref _isDisposed, true); goto IL_000f; } IObservable<TSource> observable; try { observable = <Drain>g__Unpack|7_0(source); } catch (Exception error2) { if (!Fail(error2)) Volatile.Write(ref _isDisposed, true); goto IL_000f; } if (observable == null) { _stack.Pop(); enumerator.Dispose(); goto IL_000f; } IEnumerable<IObservable<TSource>> enumerable = Extract(observable); if (enumerable != null) { if (TryGetEnumerator(enumerable, out IEnumerator<IObservable<TSource>> result)) _stack.Push(result); else Volatile.Write(ref _isDisposed, true); goto IL_000f; } IDisposable ready = ReadyToken.Ready; if (Disposable.TrySetSingle(ref _currentSubscription, ready) != 0) goto IL_000f; IDisposable disposable = ObservableExtensions.SubscribeSafe<TSource>(observable, (IObserver<TSource>)this); IDisposable disposable2 = Interlocked.CompareExchange<IDisposable>(ref _currentSubscription, disposable, ready); if (disposable2 != ready) { disposable.Dispose(); if (disposable2 == BooleanDisposable.True) goto IL_000f; } } else { Volatile.Write(ref _isDisposed, true); Done(); } } while (Interlocked.Decrement(ref _trampoline) != 0); } } private void DisposeAll() { Volatile.Write(ref _isDisposed, true); Drain(); } protected void Recurse() { if (Disposable.TrySetSerial(ref _currentSubscription, null)) Drain(); } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 2, 1, 1 })] protected abstract IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source); private bool TryGetEnumerator(IEnumerable<IObservable<TSource>> sources, [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 1, 1 })] [System.Diagnostics.CodeAnalysis.NotNullWhen(true)] out IEnumerator<IObservable<TSource>> result) { try { result = sources.GetEnumerator(); return true; } catch (Exception error) { ForwardOnError(error); result = null; return false; } } protected virtual void Done() { ForwardOnCompleted(); } protected virtual bool Fail(Exception error) { ForwardOnError(error); return false; } } }