AnonymousSafeObserver<T>
using System.Threading;
namespace System.Reactive
{
internal sealed class AnonymousSafeObserver<T> : IObserver<T>
{
private readonly Action<T> _onNext;
private readonly Action<Exception> _onError;
private readonly Action _onCompleted;
private readonly IDisposable _disposable;
private int isStopped;
public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted, IDisposable disposable)
{
_onNext = onNext;
_onError = onError;
_onCompleted = onCompleted;
_disposable = disposable;
}
public void OnNext(T value)
{
if (isStopped == 0) {
bool flag = false;
try {
_onNext(value);
flag = true;
} finally {
if (!flag)
_disposable.Dispose();
}
}
}
public void OnError(Exception error)
{
if (Interlocked.Exchange(ref isStopped, 1) == 0)
try {
_onError(error);
} finally {
_disposable.Dispose();
}
}
public void OnCompleted()
{
if (Interlocked.Exchange(ref isStopped, 1) == 0)
try {
_onCompleted();
} finally {
_disposable.Dispose();
}
}
}
}