<PackageReference Include="System.Reactive" Version="5.0.0-preview.16" />
EventProducer<TDelegate, TArgs>
using System.
Reactive.
Concurrency;
using System.
Reactive.
Disposables;
using System.
Reactive.
Subjects;
namespace System.
Reactive.
Linq.
ObservableImpl
{
internal abstract class EventProducer<
TDelegate,
TArgs> :
BasicProducer<
TArgs>
{
private sealed class Session
{
private readonly EventProducer<
TDelegate,
TArgs>
_parent;
private readonly Subject<
TArgs>
_subject;
private SingleAssignmentDisposable _removeHandler;
private int _count;
public Session(
EventProducer<
TDelegate,
TArgs>
parent)
{
_parent =
parent;
_subject =
new Subject<
TArgs>();
}
public IDisposable Connect(
IObserver<
TArgs>
observer)
{
IDisposable disposable =
_subject.
Subscribe(
observer);
if (++
_count ==
1)
try {
Initialize();
}
catch (
Exception error) {
_count--;
disposable.
Dispose();
observer.
OnError(
error);
return Disposable.
Empty;
}
return Disposable.
Create<(
Session,
EventProducer<
TDelegate,
TArgs>,
IDisposable)>((
this,
_parent,
disposable), (
Action<(
Session,
EventProducer<
TDelegate,
TArgs>,
IDisposable)>)
delegate((
Session,
EventProducer<
TDelegate,
TArgs>
_parent,
IDisposable connection)
tuple) {
Session item =
tuple.
Item1;
EventProducer<
TDelegate,
TArgs>
item2 =
tuple.
_parent;
tuple.
connection.
Dispose();
lock (
item2.
_gate) {
if (--
item.
_count ==
0) {
Scheduler.
ScheduleAction<
SingleAssignmentDisposable>(
item2.
_scheduler,
item.
_removeHandler, (
Action<
SingleAssignmentDisposable>)
delegate(
SingleAssignmentDisposable handler) {
handler.
Dispose();
});
item2.
_session =
null;
}
}
});
}
private void Initialize()
{
_removeHandler =
new SingleAssignmentDisposable();
EventProducer<
TDelegate,
TArgs>
parent =
_parent;
Subject<
TArgs>
subject =
_subject;
TDelegate handler =
parent.
GetHandler(((
SubjectBase<
TArgs>)
subject).
OnNext);
Scheduler.
ScheduleAction<
TDelegate>(
_parent.
_scheduler,
handler, (
Action<
TDelegate>)
AddHandler);
}
private void AddHandler(
TDelegate onNext)
{
IDisposable disposable;
try {
disposable =
_parent.
AddHandler(
onNext);
}
catch (
Exception error) {
_subject.
OnError(
error);
return;
}
_removeHandler.
Disposable =
disposable;
}
}
private readonly IScheduler _scheduler;
private readonly object _gate;
private Session _session;
protected EventProducer(
IScheduler scheduler)
{
_scheduler =
scheduler;
_gate =
new object();
}
protected abstract TDelegate GetHandler(
Action<
TArgs>
onNext);
protected abstract IDisposable AddHandler(
TDelegate handler);
protected override IDisposable Run(
IObserver<
TArgs>
observer)
{
IDisposable disposable =
null;
lock (
_gate) {
if (
_session ==
null)
_session =
new Session(
this);
return _session.
Connect(
observer);
}
}
}
}