<PackageReference Include="System.Reactive" Version="6.0.0-preview.9" />

HalfSerializer

static class HalfSerializer
Utility methods for dealing with serializing OnXXX signals for an IObserver where concurrent OnNext is still not allowed but concurrent OnError/OnCompleted may happen. This serialization case is generally lower overhead than a full SerializedObserver wrapper and doesn't need allocation.
using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal static class HalfSerializer { public static void ForwardOnNext<[System.Runtime.CompilerServices.Nullable(2)] T>(ISink<T> sink, T item, ref int wip, [System.Runtime.CompilerServices.Nullable(2)] ref Exception error) { if (Interlocked.CompareExchange(ref wip, 1, 0) == 0) { sink.ForwardOnNext(item); if (Interlocked.Decrement(ref wip) != 0) { Exception ex = error; if (ex != ExceptionHelper.Terminated) { error = ExceptionHelper.Terminated; sink.ForwardOnError(ex); } else sink.ForwardOnCompleted(); } } else if (error == null) { Trace.TraceWarning("OnNext called while another OnNext call was in progress on the same Observer."); } } public static void ForwardOnError<[System.Runtime.CompilerServices.Nullable(2)] T>(ISink<T> sink, Exception ex, ref int wip, [System.Runtime.CompilerServices.Nullable(2)] ref Exception error) { if (ExceptionHelper.TrySetException(ref error, ex) && Interlocked.Increment(ref wip) == 1) { error = ExceptionHelper.Terminated; sink.ForwardOnError(ex); } } [System.Runtime.CompilerServices.NullableContext(2)] public static void ForwardOnCompleted<T>([System.Runtime.CompilerServices.Nullable(1)] ISink<T> sink, ref int wip, ref Exception error) { if (ExceptionHelper.TrySetException(ref error, ExceptionHelper.Terminated) && Interlocked.Increment(ref wip) == 1) sink.ForwardOnCompleted(); } } }