<PackageReference Include="System.Reactive" Version="6.0.1-preview.1" />
ForEach<TSource>
using System.
Runtime.
CompilerServices;
using System.
Threading;
namespace System.
Reactive.
Linq.
ObservableImpl
{
[
System.
Runtime.
CompilerServices.
NullableContext(
1)]
[
System.
Runtime.
CompilerServices.
Nullable(
0)]
internal sealed class ForEach<[
System.
Runtime.
CompilerServices.
Nullable(
2)]
TSource>
{
[
System.
Runtime.
CompilerServices.
Nullable(
0)]
public abstract class ObserverBase :
ManualResetEventSlim,
IObserver<
TSource>
{
[
System.
Runtime.
CompilerServices.
Nullable(
2)]
private Exception _exception;
private int _stopped;
[
System.
Runtime.
CompilerServices.
Nullable(
2)]
public Exception Error {
[
System.
Runtime.
CompilerServices.
NullableContext(
2)]
get {
return _exception;
}
}
protected abstract void OnNextCore(
TSource value);
public void OnNext(
TSource value)
{
if (
Volatile.
Read(
ref _stopped) ==
0)
try {
OnNextCore(
value);
}
catch (
Exception error) {
OnError(
error);
}
}
public void OnError(
Exception error)
{
if (
Interlocked.
Exchange(
ref _stopped,
1) ==
0) {
_exception =
error;
Set();
}
}
public void OnCompleted()
{
if (
Interlocked.
Exchange(
ref _stopped,
1) ==
0)
Set();
}
}
[
System.
Runtime.
CompilerServices.
Nullable(
0)]
public sealed class Observer :
ObserverBase
{
private readonly Action<
TSource>
_onNext;
public Observer(
Action<
TSource>
onNext)
{
_onNext =
onNext;
}
protected override void OnNextCore(
TSource value)
{
_onNext(
value);
}
}
[
System.
Runtime.
CompilerServices.
Nullable(
0)]
public sealed class ObserverIndexed :
ObserverBase
{
private readonly Action<
TSource,
int>
_onNext;
private int _index;
public ObserverIndexed(
Action<
TSource,
int>
onNext)
{
_onNext =
onNext;
}
protected override void OnNextCore(
TSource value)
{
_onNext(
value,
checked(
_index++));
}
}
}
}