RefCount<TSource>
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Threading;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class RefCount<TSource>
{
internal sealed class Eager : Producer<TSource, Eager._>
{
internal sealed class _ : IdentitySink<TSource>
{
private readonly Eager _parent;
private RefConnection _targetConnection;
public _(IObserver<TSource> observer, Eager parent)
: base(observer)
{
_parent = parent;
}
public void Run()
{
bool flag = false;
RefConnection refConnection = null;
lock (_parent._gate) {
refConnection = _parent._connection;
if (refConnection == null) {
refConnection = new RefConnection();
_parent._connection = refConnection;
}
flag = (refConnection._count++ == 0);
_targetConnection = refConnection;
}
Run(_parent._source);
if (flag && !Disposable.GetIsDisposed(ref refConnection._disposable))
Disposable.SetSingle(ref refConnection._disposable, _parent._source.Connect());
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing) {
RefConnection targetConnection = _targetConnection;
_targetConnection = null;
lock (_parent._gate) {
if (targetConnection != _parent._connection || --targetConnection._count != 0)
return;
_parent._connection = null;
}
Disposable.TryDispose(ref targetConnection._disposable);
}
}
}
private sealed class RefConnection
{
internal int _count;
internal IDisposable _disposable;
}
private readonly IConnectableObservable<TSource> _source;
private readonly object _gate;
private RefConnection _connection;
public Eager(IConnectableObservable<TSource> source)
{
_source = source;
_gate = new object();
}
protected override _ CreateSink(IObserver<TSource> observer)
{
return new _(observer, this);
}
protected override void Run(_ sink)
{
sink.Run();
}
}
internal sealed class Lazy : Producer<TSource, Lazy._>
{
internal sealed class _ : IdentitySink<TSource>
{
public _(IObserver<TSource> observer)
: base(observer)
{
}
public void Run(Lazy parent)
{
IDisposable item = ObservableExtensions.SubscribeSafe<TSource>((IObservable<TSource>)parent._source, (IObserver<TSource>)this);
lock (parent._gate) {
if (++parent._count == 1) {
if (parent._connectableSubscription == null)
parent._connectableSubscription = parent._source.Connect();
Disposable.TrySetSerial(ref parent._serial, new SingleAssignmentDisposable());
}
}
SetUpstream(Disposable.Create<(Lazy, IDisposable)>((parent, item), (Action<(Lazy, IDisposable)>)delegate((Lazy parent, IDisposable subscription) tuple) {
Lazy item2 = tuple.parent;
tuple.subscription.Dispose();
lock (item2._gate) {
if (--item2._count == 0) {
SingleAssignmentDisposable singleAssignmentDisposable = (SingleAssignmentDisposable)Volatile.Read<IDisposable>(ref item2._serial);
singleAssignmentDisposable.Disposable = Scheduler.ScheduleAction<(SingleAssignmentDisposable, Lazy)>(item2._scheduler, (singleAssignmentDisposable, item2), item2._disconnectTime, (Action<(SingleAssignmentDisposable, Lazy)>)delegate((SingleAssignmentDisposable cancelable, Lazy closureParent) tuple2) {
lock (tuple2.closureParent._gate) {
if (Volatile.Read<IDisposable>(ref tuple2.closureParent._serial) == tuple2.cancelable) {
tuple2.closureParent._connectableSubscription.Dispose();
tuple2.closureParent._connectableSubscription = null;
}
}
});
}
}
}));
}
}
private readonly object _gate;
private readonly IScheduler _scheduler;
private readonly TimeSpan _disconnectTime;
private readonly IConnectableObservable<TSource> _source;
private IDisposable _serial;
private int _count;
private IDisposable _connectableSubscription;
public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler)
{
_source = source;
_gate = new object();
_disconnectTime = disconnectTime;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<TSource> observer)
{
return new _(observer);
}
protected override void Run(_ sink)
{
sink.Run(this);
}
}
}
}