HalfSerializer
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
internal static class HalfSerializer
{
public static void ForwardOnNext<[System.Runtime.CompilerServices.Nullable(2)] T>(System.Reactive.ISink<T> sink, T item, ref int wip, [System.Runtime.CompilerServices.Nullable(2)] ref Exception error)
{
if (Interlocked.CompareExchange(ref wip, 1, 0) == 0) {
sink.ForwardOnNext(item);
if (Interlocked.Decrement(ref wip) != 0) {
Exception ex = error;
if (ex != System.Reactive.ExceptionHelper.Terminated) {
error = System.Reactive.ExceptionHelper.Terminated;
sink.ForwardOnError(ex);
} else
sink.ForwardOnCompleted();
}
} else if (error == null) {
Trace.TraceWarning("OnNext called while another OnNext call was in progress on the same Observer.");
}
}
public static void ForwardOnError<[System.Runtime.CompilerServices.Nullable(2)] T>(System.Reactive.ISink<T> sink, Exception ex, ref int wip, [System.Runtime.CompilerServices.Nullable(2)] ref Exception error)
{
if (System.Reactive.ExceptionHelper.TrySetException(ref error, ex) && Interlocked.Increment(ref wip) == 1) {
error = System.Reactive.ExceptionHelper.Terminated;
sink.ForwardOnError(ex);
}
}
[System.Runtime.CompilerServices.NullableContext(2)]
public static void ForwardOnCompleted<T>([System.Runtime.CompilerServices.Nullable(1)] System.Reactive.ISink<T> sink, ref int wip, ref Exception error)
{
if (System.Reactive.ExceptionHelper.TrySetException(ref error, System.Reactive.ExceptionHelper.Terminated) && Interlocked.Increment(ref wip) == 1)
sink.ForwardOnCompleted();
}
}
}