<PackageReference Include="System.Reactive" Version="6.0.0-preview.9" />

Delay<TSource, TDelay>

static class Delay<TSource, TDelay>
using System.Reactive.Disposables; using System.Runtime.CompilerServices; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal static class Delay<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TDelay> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0, 0, 1 })] internal abstract class Base<[System.Runtime.CompilerServices.Nullable(0)] TParent> : Producer<TSource, Base<TParent>._> where TParent : Base<TParent> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal abstract class _ : IdentitySink<TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] private sealed class DelayObserver : SafeObserver<TDelay> { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0, 0 })] private readonly _ _parent; private readonly TSource _value; private bool _once; public DelayObserver([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0, 0 })] _ parent, TSource value) { _parent = parent; _value = value; } public override void OnNext(TDelay value) { if (!_once) { _once = true; lock (_parent._gate) { _parent.ForwardOnNext(_value); _parent._delays.Remove(this); _parent.CheckDone(); } } } public override void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public override void OnCompleted() { if (!_once) { lock (_parent._gate) { _parent.ForwardOnNext(_value); _parent._delays.Remove(this); _parent.CheckDone(); } } } } private readonly CompositeDisposable _delays = new CompositeDisposable(); private readonly object _gate = new object(); private readonly Func<TSource, IObservable<TDelay>> _delaySelector; private bool _atEnd; private SingleAssignmentDisposableValue _subscription; protected _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer) : base(observer) { _delaySelector = delaySelector; } public void Run(TParent parent) { _atEnd = false; _subscription.Disposable = RunCore(parent); } protected override void Dispose(bool disposing) { if (disposing) { _subscription.Dispose(); _delays.Dispose(); } base.Dispose(disposing); } protected abstract IDisposable RunCore(TParent parent); public override void OnNext(TSource value) { IObservable<TDelay> source; try { source = _delaySelector(value); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } DelayObserver delayObserver = new DelayObserver(this, value); _delays.Add(delayObserver); delayObserver.SetResource(ObservableExtensions.SubscribeSafe<TDelay>(source, (IObserver<TDelay>)delayObserver)); } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { _atEnd = true; _subscription.Dispose(); CheckDone(); } } private void CheckDone() { if (_atEnd && _delays.Count == 0) ForwardOnCompleted(); } } protected readonly IObservable<TSource> _source; protected Base(IObservable<TSource> source) { _source = source; } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 0, 0, 1, 0, 0 })] internal class Selector : Base<Selector> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 0, 0, 1, 0, 0 })] private new sealed class _ : Base<Selector>._ { public _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer) : base(delaySelector, observer) { } protected override IDisposable RunCore([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] Selector parent) { return ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this); } } private readonly Func<TSource, IObservable<TDelay>> _delaySelector; public Selector(IObservable<TSource> source, Func<TSource, IObservable<TDelay>> delaySelector) : base(source) { _delaySelector = delaySelector; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0, 1, 0, 0 })] protected override Base<Selector>._ CreateSink(IObserver<TSource> observer) { return new _(_delaySelector, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0, 1, 0, 0 })] Base<Selector>._ sink) { sink.Run(this); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 0, 0, 1, 0, 0 })] internal sealed class SelectorWithSubscriptionDelay : Base<SelectorWithSubscriptionDelay> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 0, 0, 1, 0, 0 })] private new sealed class _ : Base<SelectorWithSubscriptionDelay>._ { [System.Runtime.CompilerServices.Nullable(0)] private sealed class SubscriptionDelayObserver : IObserver<TDelay>, IDisposable { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] private readonly _ _parent; private readonly IObservable<TSource> _source; private SerialDisposableValue _subscription; public SubscriptionDelayObserver([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ parent, IObservable<TSource> source) { _parent = parent; _source = source; } internal void SetFirst(IDisposable d) { _subscription.TrySetFirst(d); } public void OnNext(TDelay value) { _subscription.Disposable = ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)_parent); } public void OnError(Exception error) { _parent.ForwardOnError(error); } public void OnCompleted() { _subscription.Disposable = ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)_parent); } public void Dispose() { _subscription.Dispose(); } } public _(Func<TSource, IObservable<TDelay>> delaySelector, IObserver<TSource> observer) : base(delaySelector, observer) { } protected override IDisposable RunCore([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] SelectorWithSubscriptionDelay parent) { SubscriptionDelayObserver subscriptionDelayObserver = new SubscriptionDelayObserver(this, parent._source); subscriptionDelayObserver.SetFirst(ObservableExtensions.SubscribeSafe<TDelay>(parent._subscriptionDelay, (IObserver<TDelay>)subscriptionDelayObserver)); return subscriptionDelayObserver; } } private readonly IObservable<TDelay> _subscriptionDelay; private readonly Func<TSource, IObservable<TDelay>> _delaySelector; public SelectorWithSubscriptionDelay(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delaySelector) : base(source) { _subscriptionDelay = subscriptionDelay; _delaySelector = delaySelector; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0, 1, 0, 0 })] protected override Base<SelectorWithSubscriptionDelay>._ CreateSink(IObserver<TSource> observer) { return new _(_delaySelector, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0, 1, 0, 0 })] Base<SelectorWithSubscriptionDelay>._ sink) { sink.Run(this); } } } }