BaseBlocking<T>
using System.
Reactive.
Disposables;
using System.
Threading;
namespace System.
Reactive.
Linq.
ObservableImpl
{
internal abstract class BaseBlocking<
T> :
CountdownEvent,
IObserver<
T>
{
protected IDisposable _upstream;
internal T _value;
internal bool _hasValue;
internal Exception _error;
private int _once;
internal BaseBlocking()
:
base(
1)
{
}
internal void SetUpstream(
IDisposable d)
{
Disposable.
SetSingle(
ref _upstream,
d);
}
protected void Unblock()
{
if (
Interlocked.
CompareExchange(
ref _once,
1,
0) ==
0)
Signal();
}
public abstract void OnCompleted();
public virtual void OnError(
Exception error)
{
_value =
default(
T);
_error =
error;
Unblock();
}
public abstract void OnNext(
T value);
public new void Dispose()
{
base.
Dispose();
if (!
Disposable.
GetIsDisposed(
ref _upstream))
Disposable.
TryDispose(
ref _upstream);
}
}
}