<PackageReference Include="System.Reactive" Version="6.0.2" />

RefCount<TSource>

static class RefCount<TSource>
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal static class RefCount<[System.Runtime.CompilerServices.Nullable(2)] TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0 })] internal sealed class Eager : Producer<TSource, Eager._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] private readonly Eager _parent; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0 })] private RefConnection _targetConnection; public _(IObserver<TSource> observer, [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Eager parent) : base(observer) { _parent = parent; } public void Run() { RefConnection refConnection = default(RefConnection); bool flag = default(bool); lock (_parent._gate) { refConnection = _parent._connection; if (refConnection == null) { refConnection = new RefConnection(); _parent._connection = refConnection; } flag = (++refConnection._count == _parent._minObservers && refConnection._disposable.Disposable == null); _targetConnection = refConnection; } Run(_parent._source); if (flag && !refConnection._disposable.IsDisposed) refConnection._disposable.Disposable = _parent._source.Connect(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) { RefConnection targetConnection = _targetConnection; _targetConnection = null; lock (_parent._gate) { if (targetConnection != _parent._connection || --targetConnection._count != 0) return; _parent._connection = null; } targetConnection._disposable.Dispose(); } } } [System.Runtime.CompilerServices.NullableContext(0)] private sealed class RefConnection { internal int _count; internal SingleAssignmentDisposableValue _disposable; } private readonly IConnectableObservable<TSource> _source; private readonly object _gate = new object(); [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0 })] private RefConnection _connection; private readonly int _minObservers; public Eager(IConnectableObservable<TSource> source, int minObservers) { _source = source; _minObservers = minObservers; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(observer, this); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0 })] internal sealed class Lazy : Producer<TSource, Lazy._> { [System.Runtime.CompilerServices.NullableContext(0)] private enum State { DisconnectedNoSubscribers, DisconnectedWithSubscribers, ConnectedWithSubscribers, ConnectedWithNoSubscribers } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { public _(IObserver<TSource> observer) : base(observer) { } public void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Lazy parent) { IDisposable item = ObservableExtensions.SubscribeSafe<TSource>((IObservable<TSource>)parent._source, (IObserver<TSource>)this); lock (parent._gate) { int num = ++parent._count; bool flag = false; bool flag2 = false; State state = parent._state; if ((uint)state > 1) { if (state == State.ConnectedWithNoSubscribers) { flag2 = true; parent._state = State.ConnectedWithSubscribers; } } else { flag = (num == parent._minObservers); parent._state = ((!flag) ? State.DisconnectedWithSubscribers : State.ConnectedWithSubscribers); } if (flag) parent._connectableSubscription = parent._source.Connect(); if (flag | flag2) Disposable.TrySetSerial(ref parent._serial, new SingleAssignmentDisposable()); } SetUpstream(Disposable.Create<(Lazy, IDisposable)>((parent, item), (Action<(Lazy, IDisposable)>)delegate((Lazy parent, IDisposable subscription) tuple) { Lazy item2 = tuple.parent; tuple.subscription.Dispose(); lock (item2._gate) { if (--item2._count == 0) { if (item2._state == State.ConnectedWithSubscribers) { item2._state = State.ConnectedWithNoSubscribers; SingleAssignmentDisposable singleAssignmentDisposable = (SingleAssignmentDisposable)Volatile.Read<IDisposable>(ref item2._serial); singleAssignmentDisposable.Disposable = Scheduler.ScheduleAction<(SingleAssignmentDisposable, Lazy)>(item2._scheduler, (singleAssignmentDisposable, item2), item2._disconnectTime, (Action<(SingleAssignmentDisposable, Lazy)>)delegate((SingleAssignmentDisposable cancelable, Lazy closureParent) tuple2) { lock (tuple2.closureParent._gate) { if (Volatile.Read<IDisposable>(ref tuple2.closureParent._serial) == tuple2.cancelable) { tuple2.closureParent._state = State.DisconnectedNoSubscribers; IDisposable connectableSubscription = tuple2.closureParent._connectableSubscription; tuple2.closureParent._connectableSubscription = null; connectableSubscription.Dispose(); } } }); } else item2._state = State.DisconnectedNoSubscribers; } } })); } } private readonly object _gate; private readonly IScheduler _scheduler; private readonly TimeSpan _disconnectTime; private readonly IConnectableObservable<TSource> _source; private readonly int _minObservers; [System.Runtime.CompilerServices.Nullable(0)] private State _state; [System.Runtime.CompilerServices.Nullable(2)] private IDisposable _serial; private int _count; [System.Runtime.CompilerServices.Nullable(2)] private IDisposable _connectableSubscription; public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler, int minObservers) { _source = source; _gate = new object(); _disconnectTime = disconnectTime; _scheduler = scheduler; _minObservers = minObservers; _state = State.DisconnectedNoSubscribers; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(this); } } } }