<PackageReference Include="System.Reactive" Version="6.0.1-preview.1" />

ForEach<TSource>

sealed class ForEach<TSource>
using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal sealed class ForEach<[System.Runtime.CompilerServices.Nullable(2)] TSource> { [System.Runtime.CompilerServices.Nullable(0)] public abstract class ObserverBase : ManualResetEventSlim, IObserver<TSource> { [System.Runtime.CompilerServices.Nullable(2)] private Exception _exception; private int _stopped; [System.Runtime.CompilerServices.Nullable(2)] public Exception Error { [System.Runtime.CompilerServices.NullableContext(2)] get { return _exception; } } protected abstract void OnNextCore(TSource value); public void OnNext(TSource value) { if (Volatile.Read(ref _stopped) == 0) try { OnNextCore(value); } catch (Exception error) { OnError(error); } } public void OnError(Exception error) { if (Interlocked.Exchange(ref _stopped, 1) == 0) { _exception = error; Set(); } } public void OnCompleted() { if (Interlocked.Exchange(ref _stopped, 1) == 0) Set(); } } [System.Runtime.CompilerServices.Nullable(0)] public sealed class Observer : ObserverBase { private readonly Action<TSource> _onNext; public Observer(Action<TSource> onNext) { _onNext = onNext; } protected override void OnNextCore(TSource value) { _onNext(value); } } [System.Runtime.CompilerServices.Nullable(0)] public sealed class ObserverIndexed : ObserverBase { private readonly Action<TSource, int> _onNext; private int _index; public ObserverIndexed(Action<TSource, int> onNext) { _onNext = onNext; } protected override void OnNextCore(TSource value) { _onNext(value, checked(_index++)); } } } }