<PackageReference Include="System.Reactive" Version="4.1.1" />

Using<TSource, TResource>

sealed class Using<TSource, TResource> : Producer<TSource, _<TSource, TResource>> where TResource : IDisposable
using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal sealed class Using<TSource, TResource> : Producer<TSource, Using<TSource, TResource>._> where TResource : IDisposable { internal sealed class _ : IdentitySink<TSource> { private IDisposable _disposable; public _(IObserver<TSource> observer) : base(observer) { } public void Run(Using<TSource, TResource> parent) { IObservable<TSource> observable = null; try { TResource val = parent._resourceFactory(); if (val != null) Disposable.SetSingle(ref _disposable, (IDisposable)(object)val); observable = parent._observableFactory(val); } catch (Exception exception) { SetUpstream(ObservableExtensions.SubscribeSafe<TSource>(Observable.Throw<TSource>(exception), (IObserver<TSource>)this)); return; } Run(observable); } protected override void Dispose(bool disposing) { if (disposing) Disposable.TryDispose(ref _disposable); base.Dispose(disposing); } } private readonly Func<TResource> _resourceFactory; private readonly Func<TResource, IObservable<TSource>> _observableFactory; public Using(Func<TResource> resourceFactory, Func<TResource, IObservable<TSource>> observableFactory) { _resourceFactory = resourceFactory; _observableFactory = observableFactory; } protected override _ CreateSink(IObserver<TSource> observer) { return new _(observer); } protected override void Run(_ sink) { sink.Run(this); } } }