RedoSerializedObserver<X>
using System.
Collections.
Concurrent;
using System.
Threading;
namespace System.
Reactive.
Linq.
ObservableImpl
{
internal sealed class RedoSerializedObserver<
X> :
IObserver<
X>
{
private readonly IObserver<
X>
_downstream;
private int _wip;
private Exception _terminalException;
private static readonly Exception SignaledIndicator =
new Exception();
private readonly ConcurrentQueue<
X>
_queue;
internal RedoSerializedObserver(
IObserver<
X>
downstream)
{
_downstream =
downstream;
_queue =
new ConcurrentQueue<
X>();
}
public void OnCompleted()
{
if (
Interlocked.
CompareExchange<
Exception>(
ref _terminalException,
ExceptionHelper.
Terminated, (
Exception)
null) ==
null)
Drain();
}
public void OnError(
Exception error)
{
if (
Interlocked.
CompareExchange<
Exception>(
ref _terminalException,
error, (
Exception)
null) ==
null)
Drain();
}
public void OnNext(
X value)
{
_queue.
Enqueue(
value);
Drain();
}
private void Clear()
{
X result;
while (
_queue.
TryDequeue(
out result)) {
}
}
private void Drain()
{
if (
Interlocked.
Increment(
ref _wip) ==
1) {
int num =
1;
do {
Exception ex =
Volatile.
Read<
Exception>(
ref _terminalException);
if (
ex ==
null) {
X result;
while (
_queue.
TryDequeue(
out result)) {
_downstream.
OnNext(
result);
}
}
else {
if (
ex !=
SignaledIndicator) {
Interlocked.
Exchange<
Exception>(
ref _terminalException,
SignaledIndicator);
if (
ex !=
ExceptionHelper.
Terminated)
_downstream.
OnError(
ex);
else
_downstream.
OnCompleted();
}
Clear();
}
num =
Interlocked.
Add(
ref _wip, -
num);
}
while (
num !=
0);
}
}
}
}