<PackageReference Include="System.Reactive" Version="4.2.0-preview.102" />

Amb<TSource>

sealed class Amb<TSource> : Producer<TSource, AmbCoordinator<TSource>>
using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal sealed class Amb<TSource> : Producer<TSource, Amb<TSource>.AmbCoordinator> { internal sealed class AmbCoordinator : IDisposable { private sealed class AmbObserver : IdentitySink<TSource> { private readonly AmbCoordinator _parent; private readonly bool _isLeft; private bool _iwon; public AmbObserver(IObserver<TSource> downstream, AmbCoordinator parent, bool isLeft) : base(downstream) { _parent = parent; _isLeft = isLeft; } public override void OnCompleted() { if (_iwon) ForwardOnCompleted(); else if (_parent.TryWin(_isLeft)) { _iwon = true; ForwardOnCompleted(); } else { Dispose(); } } public override void OnError(Exception error) { if (_iwon) ForwardOnError(error); else if (_parent.TryWin(_isLeft)) { _iwon = true; ForwardOnError(error); } else { Dispose(); } } public override void OnNext(TSource value) { if (_iwon) ForwardOnNext(value); else if (_parent.TryWin(_isLeft)) { _iwon = true; ForwardOnNext(value); } } } private readonly AmbObserver _leftObserver; private readonly AmbObserver _rightObserver; private int _winner; public AmbCoordinator(IObserver<TSource> observer) { _leftObserver = new AmbObserver(observer, this, true); _rightObserver = new AmbObserver(observer, this, false); } public void Run(IObservable<TSource> left, IObservable<TSource> right) { _leftObserver.Run(left); _rightObserver.Run(right); } public void Dispose() { _leftObserver.Dispose(); _rightObserver.Dispose(); } public bool TryWin(bool isLeft) { int num = isLeft ? 1 : 2; if (Volatile.Read(ref _winner) == num) return true; if (Interlocked.CompareExchange(ref _winner, num, 0) == 0) { (isLeft ? _rightObserver : _leftObserver).Dispose(); return true; } return false; } } private readonly IObservable<TSource> _left; private readonly IObservable<TSource> _right; public Amb(IObservable<TSource> left, IObservable<TSource> right) { _left = left; _right = right; } protected override AmbCoordinator CreateSink(IObserver<TSource> observer) { return new AmbCoordinator(observer); } protected override void Run(AmbCoordinator sink) { sink.Run(_left, _right); } } }