<PackageReference Include="System.Reactive" Version="4.1.3" />

Multicast<TSource, TIntermediate, TResult>

sealed class Multicast<TSource, TIntermediate, TResult> : Producer<TResult, _<TSource, TIntermediate, TResult>>
using System.Reactive.Disposables; using System.Reactive.Subjects; namespace System.Reactive.Linq.ObservableImpl { internal sealed class Multicast<TSource, TIntermediate, TResult> : Producer<TResult, Multicast<TSource, TIntermediate, TResult>._> { internal sealed class _ : IdentitySink<TResult> { private IDisposable _connection; public _(IObserver<TResult> observer) : base(observer) { } public void Run(Multicast<TSource, TIntermediate, TResult> parent) { IObservable<TResult> observable = null; IConnectableObservable<TIntermediate> connectableObservable = null; try { ISubject<TSource, TIntermediate> subject = parent._subjectSelector(); connectableObservable = new ConnectableObservable<TSource, TIntermediate>(parent._source, subject); observable = parent._selector(connectableObservable); } catch (Exception error) { ForwardOnError(error); return; } Run(observable); Disposable.SetSingle(ref _connection, connectableObservable.Connect()); } protected override void Dispose(bool disposing) { if (disposing) Disposable.TryDispose(ref _connection); base.Dispose(disposing); } } private readonly IObservable<TSource> _source; private readonly Func<ISubject<TSource, TIntermediate>> _subjectSelector; private readonly Func<IObservable<TIntermediate>, IObservable<TResult>> _selector; public Multicast(IObservable<TSource> source, Func<ISubject<TSource, TIntermediate>> subjectSelector, Func<IObservable<TIntermediate>, IObservable<TResult>> selector) { _source = source; _subjectSelector = subjectSelector; _selector = selector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(observer); } protected override void Run(_ sink) { sink.Run(this); } } }