RefCount<TSource>
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Linq.ObservableImpl
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
internal static class RefCount<[System.Runtime.CompilerServices.Nullable(2)] TSource>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1,
1,
0
})]
internal sealed class Eager : Producer<TSource, Eager._>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
internal sealed class _ : IdentitySink<TSource>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})]
private readonly Eager _parent;
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
0
})]
private RefConnection _targetConnection;
public _(IObserver<TSource> observer, [System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})] Eager parent)
: base(observer)
{
_parent = parent;
}
public void Run()
{
RefConnection refConnection = default(RefConnection);
bool flag = default(bool);
lock (_parent._gate) {
refConnection = _parent._connection;
if (refConnection == null) {
refConnection = new RefConnection();
_parent._connection = refConnection;
}
flag = (++refConnection._count == _parent._minObservers);
_targetConnection = refConnection;
}
Run(_parent._source);
if (flag && !refConnection._disposable.IsDisposed)
refConnection._disposable.Disposable = _parent._source.Connect();
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing) {
RefConnection targetConnection = _targetConnection;
_targetConnection = null;
lock (_parent._gate) {
if (targetConnection != _parent._connection || --targetConnection._count != 0)
return;
_parent._connection = null;
}
targetConnection._disposable.Dispose();
}
}
}
[System.Runtime.CompilerServices.NullableContext(0)]
private sealed class RefConnection
{
internal int _count;
internal SingleAssignmentDisposableValue _disposable;
}
private readonly IConnectableObservable<TSource> _source;
private readonly object _gate = new object();
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
0
})]
private RefConnection _connection;
private readonly int _minObservers;
public Eager(IConnectableObservable<TSource> source, int minObservers)
{
_source = source;
_minObservers = minObservers;
}
[return: System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})]
protected override _ CreateSink(IObserver<TSource> observer)
{
return new _(observer, this);
}
protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})] _ sink)
{
sink.Run();
}
}
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1,
1,
0
})]
internal sealed class Lazy : Producer<TSource, Lazy._>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
internal sealed class _ : IdentitySink<TSource>
{
public _(IObserver<TSource> observer)
: base(observer)
{
}
public void Run([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})] Lazy parent)
{
IDisposable item = ObservableExtensions.SubscribeSafe<TSource>((IObservable<TSource>)parent._source, (IObserver<TSource>)this);
lock (parent._gate) {
if (++parent._count == parent._minObservers) {
if (parent._connectableSubscription == null)
parent._connectableSubscription = parent._source.Connect();
Disposable.TrySetSerial(ref parent._serial, new SingleAssignmentDisposable());
}
}
SetUpstream(Disposable.Create<(Lazy, IDisposable)>((parent, item), (Action<(Lazy, IDisposable)>)delegate((Lazy parent, IDisposable subscription) tuple) {
Lazy item2 = tuple.parent;
tuple.subscription.Dispose();
lock (item2._gate) {
if (--item2._count == 0) {
SingleAssignmentDisposable singleAssignmentDisposable = (SingleAssignmentDisposable)Volatile.Read<IDisposable>(ref item2._serial);
singleAssignmentDisposable.Disposable = Scheduler.ScheduleAction<(SingleAssignmentDisposable, Lazy)>(item2._scheduler, (singleAssignmentDisposable, item2), item2._disconnectTime, (Action<(SingleAssignmentDisposable, Lazy)>)delegate((SingleAssignmentDisposable cancelable, Lazy closureParent) tuple2) {
lock (tuple2.closureParent._gate) {
if (Volatile.Read<IDisposable>(ref tuple2.closureParent._serial) == tuple2.cancelable) {
tuple2.closureParent._connectableSubscription.Dispose();
tuple2.closureParent._connectableSubscription = null;
}
}
});
}
}
}));
}
}
private readonly object _gate;
private readonly IScheduler _scheduler;
private readonly TimeSpan _disconnectTime;
private readonly IConnectableObservable<TSource> _source;
private readonly int _minObservers;
[System.Runtime.CompilerServices.Nullable(2)]
private IDisposable _serial;
private int _count;
[System.Runtime.CompilerServices.Nullable(2)]
private IDisposable _connectableSubscription;
public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler, int minObservers)
{
_source = source;
_gate = new object();
_disconnectTime = disconnectTime;
_scheduler = scheduler;
_minObservers = minObservers;
}
[return: System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})]
protected override _ CreateSink(IObserver<TSource> observer)
{
return new _(observer);
}
protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})] _ sink)
{
sink.Run(this);
}
}
}
}