Sink<TTarget>
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
internal abstract class Sink<[System.Runtime.CompilerServices.Nullable(2)] TTarget> : ISink<TTarget>, IDisposable
{
private SingleAssignmentDisposableValue _upstream;
private volatile IObserver<TTarget> _observer;
protected Sink(IObserver<TTarget> observer)
{
_observer = observer;
}
public void Dispose()
{
if (Interlocked.Exchange<IObserver<TTarget>>(ref _observer, NopObserver<TTarget>.Instance) != NopObserver<TTarget>.Instance)
Dispose(true);
}
protected virtual void Dispose(bool disposing)
{
_upstream.Dispose();
}
public void ForwardOnNext(TTarget value)
{
_observer.OnNext(value);
}
public void ForwardOnCompleted()
{
_observer.OnCompleted();
Dispose();
}
public void ForwardOnError(Exception error)
{
_observer.OnError(error);
Dispose();
}
protected void SetUpstream(IDisposable upstream)
{
_upstream.Disposable = upstream;
}
protected void DisposeUpstream()
{
_upstream.Dispose();
}
}
}