<PackageReference Include="System.Reactive" Version="6.0.2" />

Amb<TSource>

sealed class Amb<TSource> : Producer<TSource, AmbCoordinator<TSource>>
using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 1 })] internal sealed class Amb<[System.Runtime.CompilerServices.Nullable(2)] TSource> : Producer<TSource, Amb<TSource>.AmbCoordinator> { [System.Runtime.CompilerServices.Nullable(0)] internal sealed class AmbCoordinator : IDisposable { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] private sealed class AmbObserver : IdentitySink<TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] private readonly AmbCoordinator _parent; private readonly bool _isLeft; private bool _iwon; public AmbObserver(IObserver<TSource> downstream, [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] AmbCoordinator parent, bool isLeft) : base(downstream) { _parent = parent; _isLeft = isLeft; } public override void OnCompleted() { if (_iwon) ForwardOnCompleted(); else if (_parent.TryWin(_isLeft)) { _iwon = true; ForwardOnCompleted(); } else { Dispose(); } } public override void OnError(Exception error) { if (_iwon) ForwardOnError(error); else if (_parent.TryWin(_isLeft)) { _iwon = true; ForwardOnError(error); } else { Dispose(); } } public override void OnNext(TSource value) { if (_iwon) ForwardOnNext(value); else if (_parent.TryWin(_isLeft)) { _iwon = true; ForwardOnNext(value); } } } [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] private readonly AmbObserver _leftObserver; [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] private readonly AmbObserver _rightObserver; private int _winner; public AmbCoordinator(IObserver<TSource> observer) { _leftObserver = new AmbObserver(observer, this, true); _rightObserver = new AmbObserver(observer, this, false); } public void Run(IObservable<TSource> left, IObservable<TSource> right) { _leftObserver.Run(left); _rightObserver.Run(right); } public void Dispose() { _leftObserver.Dispose(); _rightObserver.Dispose(); } public bool TryWin(bool isLeft) { int num = isLeft ? 1 : 2; if (Volatile.Read(ref _winner) == num) return true; if (Interlocked.CompareExchange(ref _winner, num, 0) == 0) { (isLeft ? _rightObserver : _leftObserver).Dispose(); return true; } return false; } } private readonly IObservable<TSource> _left; private readonly IObservable<TSource> _right; public Amb(IObservable<TSource> left, IObservable<TSource> right) { _left = left; _right = right; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override AmbCoordinator CreateSink(IObserver<TSource> observer) { return new AmbCoordinator(observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] AmbCoordinator sink) { sink.Run(_left, _right); } } }