Sink<TSource>
using System.Threading;
namespace System.Reactive
{
internal abstract class Sink<TSource> : IDisposable
{
private sealed class _ : IObserver<TSource>
{
private readonly Sink<TSource> _forward;
public _(Sink<TSource> forward)
{
_forward = forward;
}
public void OnNext(TSource value)
{
_forward._observer.OnNext(value);
}
public void OnError(Exception error)
{
_forward._observer.OnError(error);
_forward.Dispose();
}
public void OnCompleted()
{
_forward._observer.OnCompleted();
_forward.Dispose();
}
}
protected internal volatile IObserver<TSource> _observer;
private IDisposable _cancel;
public Sink(IObserver<TSource> observer, IDisposable cancel)
{
_observer = observer;
_cancel = cancel;
}
public virtual void Dispose()
{
_observer = NopObserver<TSource>.Instance;
Interlocked.Exchange<IDisposable>(ref _cancel, (IDisposable)null)?.Dispose();
}
public IObserver<TSource> GetForwarder()
{
return new _(this);
}
}
}