HalfSerializer
Utility methods for dealing with serializing OnXXX signals
for an IObserver where concurrent OnNext is still not allowed
but concurrent OnError/OnCompleted may happen.
This serialization case is generally lower overhead than
a full SerializedObserver wrapper and doesn't need
allocation.
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
internal static class HalfSerializer
{
public static void ForwardOnNext<[System.Runtime.CompilerServices.Nullable(2)] T>(ISink<T> sink, T item, ref int wip, [System.Runtime.CompilerServices.Nullable(2)] ref Exception error)
{
if (Interlocked.CompareExchange(ref wip, 1, 0) == 0) {
sink.ForwardOnNext(item);
if (Interlocked.Decrement(ref wip) != 0) {
Exception ex = error;
if (ex != ExceptionHelper.Terminated) {
error = ExceptionHelper.Terminated;
sink.ForwardOnError(ex);
} else
sink.ForwardOnCompleted();
}
} else if (error == null) {
Trace.TraceWarning("OnNext called while another OnNext call was in progress on the same Observer.");
}
}
public static void ForwardOnError<[System.Runtime.CompilerServices.Nullable(2)] T>(ISink<T> sink, Exception ex, ref int wip, [System.Runtime.CompilerServices.Nullable(2)] ref Exception error)
{
if (ExceptionHelper.TrySetException(ref error, ex) && Interlocked.Increment(ref wip) == 1) {
error = ExceptionHelper.Terminated;
sink.ForwardOnError(ex);
}
}
[System.Runtime.CompilerServices.NullableContext(2)]
public static void ForwardOnCompleted<T>([System.Runtime.CompilerServices.Nullable(1)] ISink<T> sink, ref int wip, ref Exception error)
{
if (ExceptionHelper.TrySetException(ref error, ExceptionHelper.Terminated) && Interlocked.Increment(ref wip) == 1)
sink.ForwardOnCompleted();
}
}
}