<PackageReference Include="System.Reactive" Version="4.2.0-preview.625" />

SafeObserver<TSource>

abstract class SafeObserver<TSource> : ISafeObserver<TSource>, IObserver<TSource>, IDisposable
using System.Reactive.Disposables; namespace System.Reactive { internal abstract class SafeObserver<TSource> : ISafeObserver<TSource>, IObserver<TSource>, IDisposable { private sealed class WrappingSafeObserver : SafeObserver<TSource> { private readonly IObserver<TSource> _observer; public WrappingSafeObserver(IObserver<TSource> observer) { _observer = observer; } public override void OnNext(TSource value) { bool flag = false; try { _observer.OnNext(value); flag = true; } finally { if (!flag) Dispose(); } } public override void OnError(Exception error) { using (this) _observer.OnError(error); } public override void OnCompleted() { using (this) _observer.OnCompleted(); } } private IDisposable _disposable; public static ISafeObserver<TSource> Wrap(IObserver<TSource> observer) { AnonymousObserver<TSource> anonymousObserver = observer as AnonymousObserver<TSource>; if (anonymousObserver != null) return anonymousObserver.MakeSafe(); return new WrappingSafeObserver(observer); } public abstract void OnNext(TSource value); public abstract void OnError(Exception error); public abstract void OnCompleted(); public void SetResource(IDisposable resource) { Disposable.SetSingle(ref _disposable, resource); } public void Dispose() { Dispose(true); } protected virtual void Dispose(bool disposing) { if (disposing) Disposable.TryDispose(ref _disposable); } } }