<PackageReference Include="System.Reactive" Version="6.1.0-preview.9" />

AmbCoordinator<T>

sealed class AmbCoordinator<T> : IDisposable
using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal sealed class AmbCoordinator<[System.Runtime.CompilerServices.Nullable(2)] T> : IDisposable { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class InnerObserver : IdentitySink<T> { private readonly AmbCoordinator<T> _parent; private readonly int _index; private bool _won; public InnerObserver(AmbCoordinator<T> parent, int index) : base(parent._downstream) { _parent = parent; _index = index; } public override void OnCompleted() { if (_won) ForwardOnCompleted(); else if (_parent.TryWin(_index)) { _won = true; ForwardOnCompleted(); } else { Dispose(); } } public override void OnError(Exception error) { if (_won) ForwardOnError(error); else if (_parent.TryWin(_index)) { _won = true; ForwardOnError(error); } else { Dispose(); } } public override void OnNext(T value) { if (_won) ForwardOnNext(value); else if (_parent.TryWin(_index)) { _won = true; ForwardOnNext(value); } else { Dispose(); } } } private readonly IObserver<T> _downstream; [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 2, 0 })] private readonly InnerObserver[] _observers; private int _winner; internal AmbCoordinator(IObserver<T> downstream, int n) { _downstream = downstream; InnerObserver[] array = new InnerObserver[n]; for (int i = 0; i < n; i++) { array[i] = new InnerObserver(this, i); } _observers = array; Volatile.Write(ref _winner, -1); } internal static IDisposable Create(IObserver<T> observer, IObservable<T>[] sources) { int num = sources.Length; switch (num) { case 0: observer.OnCompleted(); return Disposable.Empty; case 1: return sources[0].Subscribe(observer); default: { AmbCoordinator<T> ambCoordinator = new AmbCoordinator<T>(observer, num); ambCoordinator.Subscribe(sources); return ambCoordinator; } } } internal void Subscribe(IObservable<T>[] sources) { for (int i = 0; i < _observers.Length; i++) { InnerObserver innerObserver = Volatile.Read<InnerObserver>(ref _observers[i]); if (innerObserver == null) break; innerObserver.Run(sources[i]); } } public void Dispose() { for (int i = 0; i < _observers.Length; i++) { Interlocked.Exchange<InnerObserver>(ref _observers[i], (InnerObserver)null)?.Dispose(); } } private bool TryWin(int index) { if (Volatile.Read(ref _winner) == -1 && Interlocked.CompareExchange(ref _winner, index, -1) == -1) { for (int i = 0; i < _observers.Length; i++) { if (index != i) Interlocked.Exchange<InnerObserver>(ref _observers[i], (InnerObserver)null)?.Dispose(); } return true; } return false; } } }