<PackageReference Include="System.Reactive" Version="5.0.0-preview.16" />

Delay<TSource, TDelay>

static class Delay<TSource, TDelay>
using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal static class Delay<TSource, TDelay> { internal abstract class Base<TParent> : Producer<TSource, Base<TParent>._> where TParent : Base<TParent> { internal abstract class _ : IdentitySink<TSource> { private sealed class DelayObserver : SafeObserver<TDelay> { private readonly _ _parent; private readonly TSource _value; public DelayObserver(_ parent, TSource value) { _parent = parent; _value = value; } public override void OnNext(TDelay value) { lock (_parent._gate) { _parent.ForwardOnNext(_value); _parent._delays.Remove(this); _parent.CheckDone(); } } public override void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public override void OnCompleted() { lock (_parent._gate) { _parent.ForwardOnNext(_value); _parent._delays.Remove(this); _parent.CheckDone(); } } } private readonly CompositeDisposable _delays = new CompositeDisposable(); private readonly object _gate = new object(); private readonly Func<TSource, IObservable<TDelay>> _delaySelector; private bool _atEnd; private IDisposable _subscription; protected _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer) : base(observer) { _delaySelector = delaySelector; } public void Run(TParent parent) { _atEnd = false; Disposable.SetSingle(ref _subscription, RunCore(parent)); } protected override void Dispose(bool disposing) { if (disposing) { Disposable.TryDispose(ref _subscription); _delays.Dispose(); } base.Dispose(disposing); } protected abstract IDisposable RunCore(TParent parent); public override void OnNext(TSource value) { IObservable<TDelay> observable = null; try { observable = _delaySelector(value); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } DelayObserver delayObserver = new DelayObserver(this, value); _delays.Add(delayObserver); delayObserver.SetResource(ObservableExtensions.SubscribeSafe<TDelay>(observable, (IObserver<TDelay>)delayObserver)); } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { _atEnd = true; Disposable.TryDispose(ref _subscription); CheckDone(); } } private void CheckDone() { if (_atEnd && _delays.Count == 0) ForwardOnCompleted(); } } protected readonly IObservable<TSource> _source; protected Base(IObservable<TSource> source) { _source = source; } } internal class Selector : Base<Selector> { private new sealed class _ : Base<Selector>._ { public _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer) : base(delaySelector, observer) { } protected override IDisposable RunCore(Selector parent) { return ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this); } } private readonly Func<TSource, IObservable<TDelay>> _delaySelector; public Selector(IObservable<TSource> source, Func<TSource, IObservable<TDelay>> delaySelector) : base(source) { _delaySelector = delaySelector; } protected override Base<Selector>._ CreateSink(IObserver<TSource> observer) { return new _(_delaySelector, observer); } protected override void Run(Base<Selector>._ sink) { sink.Run(this); } } internal sealed class SelectorWithSubscriptionDelay : Base<SelectorWithSubscriptionDelay> { private new sealed class _ : Base<SelectorWithSubscriptionDelay>._ { private sealed class SubscriptionDelayObserver : IObserver<TDelay>, IDisposable { private readonly _ _parent; private readonly IObservable<TSource> _source; private IDisposable _subscription; public SubscriptionDelayObserver(_ parent, IObservable<TSource> source) { _parent = parent; _source = source; } internal void SetFirst(IDisposable d) { Disposable.TrySetSingle(ref _subscription, d); } public void OnNext(TDelay value) { Disposable.TrySetSerial(ref _subscription, ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)_parent)); } public void OnError(Exception error) { _parent.ForwardOnError(error); } public void OnCompleted() { Disposable.TrySetSerial(ref _subscription, ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)_parent)); } public void Dispose() { Disposable.TryDispose(ref _subscription); } } public _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer) : base(delaySelector, observer) { } protected override IDisposable RunCore(SelectorWithSubscriptionDelay parent) { SubscriptionDelayObserver subscriptionDelayObserver = new SubscriptionDelayObserver(this, parent._source); subscriptionDelayObserver.SetFirst(ObservableExtensions.SubscribeSafe<TDelay>(parent._subscriptionDelay, (IObserver<TDelay>)subscriptionDelayObserver)); return subscriptionDelayObserver; } } private readonly IObservable<TDelay> _subscriptionDelay; private readonly Func<TSource, IObservable<TDelay>> _delaySelector; public SelectorWithSubscriptionDelay(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delaySelector) : base(source) { _subscriptionDelay = subscriptionDelay; _delaySelector = delaySelector; } protected override Base<SelectorWithSubscriptionDelay>._ CreateSink(IObserver<TSource> observer) { return new _(_delaySelector, observer); } protected override void Run(Base<SelectorWithSubscriptionDelay>._ sink) { sink.Run(this); } } } }