<PackageReference Include="System.Reactive" Version="4.1.6" />

ForEach<TSource>

sealed class ForEach<TSource>
using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal sealed class ForEach<TSource> { public sealed class Observer : IObserver<TSource> { private readonly Action<TSource> _onNext; private readonly Action _done; private Exception _exception; private int _stopped; public Exception Error => _exception; public Observer(Action<TSource> onNext, Action done) { _onNext = onNext; _done = done; } public void OnNext(TSource value) { if (_stopped == 0) try { _onNext(value); } catch (Exception error) { OnError(error); } } public void OnError(Exception error) { if (Interlocked.Exchange(ref _stopped, 1) == 0) { _exception = error; _done(); } } public void OnCompleted() { if (Interlocked.Exchange(ref _stopped, 1) == 0) _done(); } } public sealed class ObserverIndexed : IObserver<TSource> { private readonly Action<TSource, int> _onNext; private readonly Action _done; private int _index; private Exception _exception; private int _stopped; public Exception Error => _exception; public ObserverIndexed(Action<TSource, int> onNext, Action done) { _onNext = onNext; _done = done; } public void OnNext(TSource value) { if (_stopped == 0) try { _onNext(value, checked(_index++)); } catch (Exception error) { OnError(error); } } public void OnError(Exception error) { if (Interlocked.Exchange(ref _stopped, 1) == 0) { _exception = error; _done(); } } public void OnCompleted() { if (Interlocked.Exchange(ref _stopped, 1) == 0) _done(); } } } }