ForEach<TSource>
using System.
Threading;
namespace System.
Reactive.
Linq.
ObservableImpl
{
internal sealed class ForEach<
TSource>
{
public sealed class Observer :
IObserver<
TSource>
{
private readonly Action<
TSource>
_onNext;
private readonly Action _done;
private Exception _exception;
private int _stopped;
public Exception Error =>
_exception;
public Observer(
Action<
TSource>
onNext,
Action done)
{
_onNext =
onNext;
_done =
done;
}
public void OnNext(
TSource value)
{
if (
_stopped ==
0)
try {
_onNext(
value);
}
catch (
Exception error) {
OnError(
error);
}
}
public void OnError(
Exception error)
{
if (
Interlocked.
Exchange(
ref _stopped,
1) ==
0) {
_exception =
error;
_done();
}
}
public void OnCompleted()
{
if (
Interlocked.
Exchange(
ref _stopped,
1) ==
0)
_done();
}
}
public sealed class ObserverIndexed :
IObserver<
TSource>
{
private readonly Action<
TSource,
int>
_onNext;
private readonly Action _done;
private int _index;
private Exception _exception;
private int _stopped;
public Exception Error =>
_exception;
public ObserverIndexed(
Action<
TSource,
int>
onNext,
Action done)
{
_onNext =
onNext;
_done =
done;
}
public void OnNext(
TSource value)
{
if (
_stopped ==
0)
try {
_onNext(
value,
checked(
_index++));
}
catch (
Exception error) {
OnError(
error);
}
}
public void OnError(
Exception error)
{
if (
Interlocked.
Exchange(
ref _stopped,
1) ==
0) {
_exception =
error;
_done();
}
}
public void OnCompleted()
{
if (
Interlocked.
Exchange(
ref _stopped,
1) ==
0)
_done();
}
}
}
}