AmbCoordinator<T>
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Linq.ObservableImpl
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
internal sealed class AmbCoordinator<[System.Runtime.CompilerServices.Nullable(2)] T> : IDisposable
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
internal sealed class InnerObserver : IdentitySink<T>
{
private readonly AmbCoordinator<T> _parent;
private readonly int _index;
private bool _won;
public InnerObserver(AmbCoordinator<T> parent, int index)
: base(parent._downstream)
{
_parent = parent;
_index = index;
}
public override void OnCompleted()
{
if (_won)
ForwardOnCompleted();
else if (_parent.TryWin(_index)) {
_won = true;
ForwardOnCompleted();
} else {
Dispose();
}
}
public override void OnError(Exception error)
{
if (_won)
ForwardOnError(error);
else if (_parent.TryWin(_index)) {
_won = true;
ForwardOnError(error);
} else {
Dispose();
}
}
public override void OnNext(T value)
{
if (_won)
ForwardOnNext(value);
else if (_parent.TryWin(_index)) {
_won = true;
ForwardOnNext(value);
} else {
Dispose();
}
}
}
private readonly IObserver<T> _downstream;
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
2,
0
})]
private readonly InnerObserver[] _observers;
private int _winner;
internal AmbCoordinator(IObserver<T> downstream, int n)
{
_downstream = downstream;
InnerObserver[] array = new InnerObserver[n];
for (int i = 0; i < n; i++) {
array[i] = new InnerObserver(this, i);
}
_observers = array;
Volatile.Write(ref _winner, -1);
}
internal static IDisposable Create(IObserver<T> observer, IObservable<T>[] sources)
{
int num = sources.Length;
switch (num) {
case 0:
observer.OnCompleted();
return Disposable.Empty;
case 1:
return sources[0].Subscribe(observer);
default: {
AmbCoordinator<T> ambCoordinator = new AmbCoordinator<T>(observer, num);
ambCoordinator.Subscribe(sources);
return ambCoordinator;
}
}
}
internal void Subscribe(IObservable<T>[] sources)
{
for (int i = 0; i < _observers.Length; i++) {
InnerObserver innerObserver = Volatile.Read<InnerObserver>(ref _observers[i]);
if (innerObserver == null)
break;
innerObserver.Run(sources[i]);
}
}
public void Dispose()
{
for (int i = 0; i < _observers.Length; i++) {
Interlocked.Exchange<InnerObserver>(ref _observers[i], (InnerObserver)null)?.Dispose();
}
}
private bool TryWin(int index)
{
if (Volatile.Read(ref _winner) == -1 && Interlocked.CompareExchange(ref _winner, index, -1) == -1) {
for (int i = 0; i < _observers.Length; i++) {
if (index != i)
Interlocked.Exchange<InnerObserver>(ref _observers[i], (InnerObserver)null)?.Dispose();
}
return true;
}
return false;
}
}
}