<PackageReference Include="System.Reactive" Version="4.1.5" />

RefCount<TSource>

static class RefCount<TSource>
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal static class RefCount<TSource> { internal sealed class Eager : Producer<TSource, Eager._> { internal sealed class _ : IdentitySink<TSource> { private readonly Eager _parent; private RefConnection _targetConnection; public _(IObserver<TSource> observer, Eager parent) : base(observer) { _parent = parent; } public void Run() { bool flag = false; RefConnection refConnection = null; lock (_parent._gate) { refConnection = _parent._connection; if (refConnection == null) { refConnection = new RefConnection(); _parent._connection = refConnection; } flag = (refConnection._count++ == 0); _targetConnection = refConnection; } Run(_parent._source); if (flag && !Disposable.GetIsDisposed(ref refConnection._disposable)) Disposable.SetSingle(ref refConnection._disposable, _parent._source.Connect()); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) { RefConnection targetConnection = _targetConnection; _targetConnection = null; lock (_parent._gate) { if (targetConnection != _parent._connection || --targetConnection._count != 0) return; _parent._connection = null; } Disposable.TryDispose(ref targetConnection._disposable); } } } private sealed class RefConnection { internal int _count; internal IDisposable _disposable; } private readonly IConnectableObservable<TSource> _source; private readonly object _gate; private RefConnection _connection; public Eager(IConnectableObservable<TSource> source) { _source = source; _gate = new object(); } protected override _ CreateSink(IObserver<TSource> observer) { return new _(observer, this); } protected override void Run(_ sink) { sink.Run(); } } internal sealed class Lazy : Producer<TSource, Lazy._> { internal sealed class _ : IdentitySink<TSource> { public _(IObserver<TSource> observer) : base(observer) { } public void Run(Lazy parent) { IDisposable item = ObservableExtensions.SubscribeSafe<TSource>((IObservable<TSource>)parent._source, (IObserver<TSource>)this); lock (parent._gate) { if (++parent._count == 1) { if (parent._connectableSubscription == null) parent._connectableSubscription = parent._source.Connect(); Disposable.TrySetSerial(ref parent._serial, new SingleAssignmentDisposable()); } } SetUpstream(Disposable.Create<(Lazy, IDisposable)>((parent, item), (Action<(Lazy, IDisposable)>)delegate((Lazy parent, IDisposable subscription) tuple) { Lazy item2 = tuple.parent; tuple.subscription.Dispose(); lock (item2._gate) { if (--item2._count == 0) { SingleAssignmentDisposable singleAssignmentDisposable = (SingleAssignmentDisposable)Volatile.Read<IDisposable>(ref item2._serial); singleAssignmentDisposable.Disposable = Scheduler.ScheduleAction<(SingleAssignmentDisposable, Lazy)>(item2._scheduler, (singleAssignmentDisposable, item2), item2._disconnectTime, (Action<(SingleAssignmentDisposable, Lazy)>)delegate((SingleAssignmentDisposable cancelable, Lazy closureParent) tuple2) { lock (tuple2.closureParent._gate) { if (Volatile.Read<IDisposable>(ref tuple2.closureParent._serial) == tuple2.cancelable) { tuple2.closureParent._connectableSubscription.Dispose(); tuple2.closureParent._connectableSubscription = null; } } }); } } })); } } private readonly object _gate; private readonly IScheduler _scheduler; private readonly TimeSpan _disconnectTime; private readonly IConnectableObservable<TSource> _source; private IDisposable _serial; private int _count; private IDisposable _connectableSubscription; public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler) { _source = source; _gate = new object(); _disconnectTime = disconnectTime; _scheduler = scheduler; } protected override _ CreateSink(IObserver<TSource> observer) { return new _(observer); } protected override void Run(_ sink) { sink.Run(this); } } } }