<PackageReference Include="System.Reactive" Version="4.0.0" />

Amb<TSource>

sealed class Amb<TSource> : Producer<TSource, _<TSource>>
using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal sealed class Amb<TSource> : Producer<TSource, Amb<TSource>._> { internal sealed class _ : Sink<TSource> { private sealed class DecisionObserver : IObserver<TSource> { private readonly _ _parent; private readonly AmbState _me; private readonly IDisposable _subscription; private readonly IDisposable _otherSubscription; private readonly object _gate; private readonly AmbObserver _observer; public DecisionObserver(_ parent, object gate, AmbState me, IDisposable subscription, IDisposable otherSubscription, AmbObserver observer) { _parent = parent; _gate = gate; _me = me; _subscription = subscription; _otherSubscription = otherSubscription; _observer = observer; } public void OnNext(TSource value) { lock (_gate) { if (_parent._choice == AmbState.Neither) { _parent._choice = _me; _otherSubscription.Dispose(); _observer._disposable = _subscription; _observer._target = _parent._observer; } if (_parent._choice == _me) _parent._observer.OnNext(value); } } public void OnError(Exception error) { lock (_gate) { if (_parent._choice == AmbState.Neither) { _parent._choice = _me; _otherSubscription.Dispose(); _observer._disposable = _subscription; _observer._target = _parent._observer; } if (_parent._choice == _me) { _parent._observer.OnError(error); _parent.Dispose(); } } } public void OnCompleted() { lock (_gate) { if (_parent._choice == AmbState.Neither) { _parent._choice = _me; _otherSubscription.Dispose(); _observer._disposable = _subscription; _observer._target = _parent._observer; } if (_parent._choice == _me) { _parent._observer.OnCompleted(); _parent.Dispose(); } } } } private sealed class AmbObserver : IObserver<TSource> { public IObserver<TSource> _target; public IDisposable _disposable; public void OnNext(TSource value) { _target.OnNext(value); } public void OnError(Exception error) { _target.OnError(error); _disposable.Dispose(); } public void OnCompleted() { _target.OnCompleted(); _disposable.Dispose(); } } private enum AmbState { Left, Right, Neither } private AmbState _choice; public _(IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { } public IDisposable Run(Amb<TSource> parent) { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); SingleAssignmentDisposable singleAssignmentDisposable2 = new SingleAssignmentDisposable(); ICancelable cancelable = StableCompositeDisposable.Create(singleAssignmentDisposable, singleAssignmentDisposable2); object gate = new object(); AmbObserver ambObserver = new AmbObserver(); ambObserver._disposable = cancelable; ambObserver._target = new DecisionObserver(this, gate, AmbState.Left, singleAssignmentDisposable, singleAssignmentDisposable2, ambObserver); AmbObserver ambObserver2 = new AmbObserver(); ambObserver2._disposable = cancelable; ambObserver2._target = new DecisionObserver(this, gate, AmbState.Right, singleAssignmentDisposable2, singleAssignmentDisposable, ambObserver2); _choice = AmbState.Neither; singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(parent._left, (IObserver<TSource>)ambObserver); singleAssignmentDisposable2.Disposable = ObservableExtensions.SubscribeSafe<TSource>(parent._right, (IObserver<TSource>)ambObserver2); return cancelable; } } private readonly IObservable<TSource> _left; private readonly IObservable<TSource> _right; public Amb(IObservable<TSource> left, IObservable<TSource> right) { _left = left; _right = right; } protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel) { return new _(observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(this); } } }