Amb<TSource>
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class Amb<TSource> : Producer<TSource, Amb<TSource>._>
{
internal sealed class _ : Sink<TSource>
{
private sealed class DecisionObserver : IObserver<TSource>
{
private readonly _ _parent;
private readonly AmbState _me;
private readonly IDisposable _subscription;
private readonly IDisposable _otherSubscription;
private readonly object _gate;
private readonly AmbObserver _observer;
public DecisionObserver(_ parent, object gate, AmbState me, IDisposable subscription, IDisposable otherSubscription, AmbObserver observer)
{
_parent = parent;
_gate = gate;
_me = me;
_subscription = subscription;
_otherSubscription = otherSubscription;
_observer = observer;
}
public void OnNext(TSource value)
{
lock (_gate) {
if (_parent._choice == AmbState.Neither) {
_parent._choice = _me;
_otherSubscription.Dispose();
_observer._disposable = _subscription;
_observer._target = _parent._observer;
}
if (_parent._choice == _me)
_parent._observer.OnNext(value);
}
}
public void OnError(Exception error)
{
lock (_gate) {
if (_parent._choice == AmbState.Neither) {
_parent._choice = _me;
_otherSubscription.Dispose();
_observer._disposable = _subscription;
_observer._target = _parent._observer;
}
if (_parent._choice == _me) {
_parent._observer.OnError(error);
_parent.Dispose();
}
}
}
public void OnCompleted()
{
lock (_gate) {
if (_parent._choice == AmbState.Neither) {
_parent._choice = _me;
_otherSubscription.Dispose();
_observer._disposable = _subscription;
_observer._target = _parent._observer;
}
if (_parent._choice == _me) {
_parent._observer.OnCompleted();
_parent.Dispose();
}
}
}
}
private sealed class AmbObserver : IObserver<TSource>
{
public IObserver<TSource> _target;
public IDisposable _disposable;
public void OnNext(TSource value)
{
_target.OnNext(value);
}
public void OnError(Exception error)
{
_target.OnError(error);
_disposable.Dispose();
}
public void OnCompleted()
{
_target.OnCompleted();
_disposable.Dispose();
}
}
private enum AmbState
{
Left,
Right,
Neither
}
private AmbState _choice;
public _(IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
}
public IDisposable Run(Amb<TSource> parent)
{
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
SingleAssignmentDisposable singleAssignmentDisposable2 = new SingleAssignmentDisposable();
ICancelable cancelable = StableCompositeDisposable.Create(singleAssignmentDisposable, singleAssignmentDisposable2);
object gate = new object();
AmbObserver ambObserver = new AmbObserver();
ambObserver._disposable = cancelable;
ambObserver._target = new DecisionObserver(this, gate, AmbState.Left, singleAssignmentDisposable, singleAssignmentDisposable2, ambObserver);
AmbObserver ambObserver2 = new AmbObserver();
ambObserver2._disposable = cancelable;
ambObserver2._target = new DecisionObserver(this, gate, AmbState.Right, singleAssignmentDisposable2, singleAssignmentDisposable, ambObserver2);
_choice = AmbState.Neither;
singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(parent._left, (IObserver<TSource>)ambObserver);
singleAssignmentDisposable2.Disposable = ObservableExtensions.SubscribeSafe<TSource>(parent._right, (IObserver<TSource>)ambObserver2);
return cancelable;
}
}
private readonly IObservable<TSource> _left;
private readonly IObservable<TSource> _right;
public Amb(IObservable<TSource> left, IObservable<TSource> right)
{
_left = left;
_right = right;
}
protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel)
{
return new _(observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(this);
}
}
}