<PackageReference Include="System.Reactive" Version="6.0.0-preview.16" />

AnonymousSafeObserver<T>

sealed class AnonymousSafeObserver<T> : SafeObserver<T>
This class fuses logic from ObserverBase, AnonymousObserver, and SafeObserver into one class. When an observer needs to be safeguarded, an instance of this type can be created by SafeObserver.Create when it detects its input is an AnonymousObserver, which is commonly used by end users when using the Subscribe extension methods that accept delegates for the On* handlers. By doing the fusion, we make the call stack depth shorter which helps debugging and some performance.
using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class AnonymousSafeObserver<[System.Runtime.CompilerServices.Nullable(2)] T> : SafeObserver<T> { private readonly Action<T> _onNext; private readonly Action<Exception> _onError; private readonly Action _onCompleted; private int _isStopped; public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted) { _onNext = onNext; _onError = onError; _onCompleted = onCompleted; } public override void OnNext(T value) { if (_isStopped == 0) { bool flag = false; try { _onNext(value); flag = true; } finally { if (!flag) Dispose(); } } } public override void OnError(Exception error) { if (Interlocked.Exchange(ref _isStopped, 1) == 0) { using (this) _onError(error); } } public override void OnCompleted() { if (Interlocked.Exchange(ref _isStopped, 1) == 0) { using (this) _onCompleted(); } } } }