<PackageReference Include="System.Reactive" Version="6.0.0-preview.16" />

RefCount<TSource>

static class RefCount<TSource>
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal static class RefCount<[System.Runtime.CompilerServices.Nullable(2)] TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0 })] internal sealed class Eager : Producer<TSource, Eager._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] private readonly Eager _parent; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0 })] private RefConnection _targetConnection; public _(IObserver<TSource> observer, [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Eager parent) : base(observer) { _parent = parent; } public void Run() { RefConnection refConnection = default(RefConnection); bool flag = default(bool); lock (_parent._gate) { refConnection = _parent._connection; if (refConnection == null) { refConnection = new RefConnection(); _parent._connection = refConnection; } flag = (++refConnection._count == _parent._minObservers); _targetConnection = refConnection; } Run(_parent._source); if (flag && !refConnection._disposable.IsDisposed) refConnection._disposable.Disposable = _parent._source.Connect(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) { RefConnection targetConnection = _targetConnection; _targetConnection = null; lock (_parent._gate) { if (targetConnection != _parent._connection || --targetConnection._count != 0) return; _parent._connection = null; } targetConnection._disposable.Dispose(); } } } [System.Runtime.CompilerServices.NullableContext(0)] private sealed class RefConnection { internal int _count; internal SingleAssignmentDisposableValue _disposable; } private readonly IConnectableObservable<TSource> _source; private readonly object _gate = new object(); [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0 })] private RefConnection _connection; private readonly int _minObservers; public Eager(IConnectableObservable<TSource> source, int minObservers) { _source = source; _minObservers = minObservers; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(observer, this); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0 })] internal sealed class Lazy : Producer<TSource, Lazy._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { public _(IObserver<TSource> observer) : base(observer) { } public void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Lazy parent) { IDisposable item = ObservableExtensions.SubscribeSafe<TSource>((IObservable<TSource>)parent._source, (IObserver<TSource>)this); lock (parent._gate) { if (++parent._count == parent._minObservers) { if (parent._connectableSubscription == null) parent._connectableSubscription = parent._source.Connect(); Disposable.TrySetSerial(ref parent._serial, new SingleAssignmentDisposable()); } } SetUpstream(Disposable.Create<(Lazy, IDisposable)>((parent, item), (Action<(Lazy, IDisposable)>)delegate((Lazy parent, IDisposable subscription) tuple) { Lazy item2 = tuple.parent; tuple.subscription.Dispose(); lock (item2._gate) { if (--item2._count == 0) { SingleAssignmentDisposable singleAssignmentDisposable = (SingleAssignmentDisposable)Volatile.Read<IDisposable>(ref item2._serial); singleAssignmentDisposable.Disposable = Scheduler.ScheduleAction<(SingleAssignmentDisposable, Lazy)>(item2._scheduler, (singleAssignmentDisposable, item2), item2._disconnectTime, (Action<(SingleAssignmentDisposable, Lazy)>)delegate((SingleAssignmentDisposable cancelable, Lazy closureParent) tuple2) { lock (tuple2.closureParent._gate) { if (Volatile.Read<IDisposable>(ref tuple2.closureParent._serial) == tuple2.cancelable) { tuple2.closureParent._connectableSubscription.Dispose(); tuple2.closureParent._connectableSubscription = null; } } }); } } })); } } private readonly object _gate; private readonly IScheduler _scheduler; private readonly TimeSpan _disconnectTime; private readonly IConnectableObservable<TSource> _source; private readonly int _minObservers; [System.Runtime.CompilerServices.Nullable(2)] private IDisposable _serial; private int _count; [System.Runtime.CompilerServices.Nullable(2)] private IDisposable _connectableSubscription; public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler, int minObservers) { _source = source; _gate = new object(); _disconnectTime = disconnectTime; _scheduler = scheduler; _minObservers = minObservers; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(this); } } } }