<PackageReference Include="System.Reactive" Version="6.0.0-preview.16" />

AppendPrepend<TSource>

static class AppendPrepend<TSource>
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Runtime.CompilerServices; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal static class AppendPrepend<[System.Runtime.CompilerServices.Nullable(2)] TSource> { internal interface IAppendPrepend : IObservable<TSource> { IScheduler Scheduler { get; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] IAppendPrepend Append(TSource value); [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] IAppendPrepend Prepend(TSource value); } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1 })] internal abstract class SingleBase<[System.Runtime.CompilerServices.Nullable(0)] TSink> : Producer<TSource, TSink>, IAppendPrepend, IObservable<TSource> where TSink : IDisposable { protected readonly IObservable<TSource> _source; protected readonly TSource _value; protected readonly bool _append; public abstract IScheduler Scheduler { get; } public SingleBase(IObservable<TSource> source, TSource value, bool append) { _source = source; _value = value; _append = append; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] public IAppendPrepend Append(TSource value) { Node<TSource> node = new Node<TSource>(_value); Node<TSource> prepend = null; Node<TSource> append; if (_append) append = new Node<TSource>(node, value); else { prepend = node; append = new Node<TSource>(value); } return CreateAppendPrepend(prepend, append); } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] public IAppendPrepend Prepend(TSource value) { Node<TSource> node = new Node<TSource>(_value); Node<TSource> append = null; Node<TSource> prepend; if (_append) { prepend = new Node<TSource>(value); append = node; } else prepend = new Node<TSource>(node, value); return CreateAppendPrepend(prepend, append); } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] private IAppendPrepend CreateAppendPrepend([System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] Node<TSource> prepend, [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] Node<TSource> append) { ISchedulerLongRunning schedulerLongRunning = Scheduler as ISchedulerLongRunning; if (schedulerLongRunning != null) return new LongRunning(_source, prepend, append, Scheduler, schedulerLongRunning); return new Recursive(_source, prepend, append, Scheduler); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 0, 1, 0 })] internal sealed class SingleValue : SingleBase<SingleValue._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { private readonly IObservable<TSource> _source; private readonly TSource _value; private readonly IScheduler _scheduler; private readonly bool _append; private SingleAssignmentDisposableValue _schedulerDisposable; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] SingleValue parent, IObserver<TSource> observer) : base(observer) { _source = parent._source; _value = parent._value; _scheduler = parent.Scheduler; _append = parent._append; } public void Run() { IDisposable upstream = _append ? ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)this) : System.Reactive.Concurrency.Scheduler.ScheduleAction<_>(_scheduler, this, (Func<_, IDisposable>)PrependValue); SetUpstream(upstream); } private static IDisposable PrependValue([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.ForwardOnNext(sink._value); return ObservableExtensions.SubscribeSafe<TSource>(sink._source, (IObserver<TSource>)sink); } public override void OnCompleted() { if (_append) { IDisposable disposable = System.Reactive.Concurrency.Scheduler.ScheduleAction<_>(_scheduler, this, (Action<_>)AppendValue); _schedulerDisposable.Disposable = disposable; } else ForwardOnCompleted(); } private static void AppendValue([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.ForwardOnNext(sink._value); sink.ForwardOnCompleted(); } protected override void Dispose(bool disposing) { if (disposing) _schedulerDisposable.Dispose(); base.Dispose(disposing); } } public override IScheduler Scheduler { get; } public SingleValue(IObservable<TSource> source, TSource value, IScheduler scheduler, bool append) : base(source, value, append) { Scheduler = scheduler; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0 })] private sealed class Recursive : Producer<TSource, Recursive._>, IAppendPrepend, IObservable<TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { private readonly IObservable<TSource> _source; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] private readonly Node<TSource> _appends; private readonly IScheduler _scheduler; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] private Node<TSource> _currentPrependNode; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 1 })] private TSource[] _appendArray; private int _currentAppendIndex; private volatile bool _disposed; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Recursive parent, IObserver<TSource> observer) : base(observer) { _source = parent._source; _scheduler = parent.Scheduler; _currentPrependNode = parent._prepends; _appends = parent._appends; } public void Run() { if (_currentPrependNode == null) SetUpstream(ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)this)); else _scheduler.Schedule<_>(this, (Func<IScheduler, _, IDisposable>)((IScheduler innerScheduler, _ this) => this.PrependValues(innerScheduler))); } public override void OnCompleted() { if (_appends == null) ForwardOnCompleted(); else { _appendArray = _appends.ToReverseArray(); _scheduler.Schedule<_>(this, (Func<IScheduler, _, IDisposable>)((IScheduler innerScheduler, _ this) => this.AppendValues(innerScheduler))); } } protected override void Dispose(bool disposing) { if (disposing) _disposed = true; base.Dispose(disposing); } private IDisposable PrependValues(IScheduler scheduler) { if (_disposed) return Disposable.Empty; TSource value = _currentPrependNode.Value; ForwardOnNext(value); _currentPrependNode = _currentPrependNode.Parent; if (_currentPrependNode == null) SetUpstream(ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)this)); else scheduler.Schedule<_>(this, (Func<IScheduler, _, IDisposable>)((IScheduler innerScheduler, _ this) => this.PrependValues(innerScheduler))); return Disposable.Empty; } private IDisposable AppendValues(IScheduler scheduler) { if (_disposed) return Disposable.Empty; TSource value = _appendArray[_currentAppendIndex]; ForwardOnNext(value); _currentAppendIndex++; if (_currentAppendIndex == _appendArray.Length) ForwardOnCompleted(); else scheduler.Schedule<_>(this, (Func<IScheduler, _, IDisposable>)((IScheduler innerScheduler, _ this) => this.AppendValues(innerScheduler))); return Disposable.Empty; } } private readonly IObservable<TSource> _source; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] private readonly Node<TSource> _appends; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] private readonly Node<TSource> _prepends; public IScheduler Scheduler { get; } public Recursive(IObservable<TSource> source, [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] Node<TSource> prepends, [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] Node<TSource> appends, IScheduler scheduler) { _source = source; _appends = appends; _prepends = prepends; Scheduler = scheduler; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] public IAppendPrepend Append(TSource value) { return new Recursive(_source, _prepends, new Node<TSource>(_appends, value), Scheduler); } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] public IAppendPrepend Prepend(TSource value) { return new Recursive(_source, new Node<TSource>(_prepends, value), _appends, Scheduler); } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0 })] private sealed class LongRunning : Producer<TSource, LongRunning._>, IAppendPrepend, IObservable<TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { private readonly IObservable<TSource> _source; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] private readonly Node<TSource> _prepends; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] private readonly Node<TSource> _appends; private readonly ISchedulerLongRunning _scheduler; private SerialDisposableValue _schedulerDisposable; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] LongRunning parent, IObserver<TSource> observer) : base(observer) { _source = parent._source; _scheduler = parent._longRunningScheduler; _prepends = parent._prepends; _appends = parent._appends; } public void Run() { if (_prepends == null) SetUpstream(ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)this)); else { IDisposable disposable = _scheduler.ScheduleLongRunning<_>(this, (Action<_, ICancelable>)delegate(_ this, ICancelable cancel) { this.PrependValues(cancel); }); _schedulerDisposable.TrySetFirst(disposable); } } public override void OnCompleted() { if (_appends == null) ForwardOnCompleted(); else { IDisposable disposable = _scheduler.ScheduleLongRunning<_>(this, (Action<_, ICancelable>)delegate(_ this, ICancelable cancel) { this.AppendValues(cancel); }); _schedulerDisposable.Disposable = disposable; } } protected override void Dispose(bool disposing) { if (disposing) _schedulerDisposable.Dispose(); base.Dispose(disposing); } private void PrependValues(ICancelable cancel) { Node<TSource> node = _prepends; do { if (cancel.IsDisposed) return; ForwardOnNext(node.Value); node = node.Parent; } while (node != null); SetUpstream(ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)this)); } private void AppendValues(ICancelable cancel) { TSource[] array = _appends.ToReverseArray(); int num = 0; do { if (cancel.IsDisposed) return; ForwardOnNext(array[num]); num++; } while (num != array.Length); ForwardOnCompleted(); } } private readonly IObservable<TSource> _source; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] private readonly Node<TSource> _appends; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] private readonly Node<TSource> _prepends; private readonly ISchedulerLongRunning _longRunningScheduler; public IScheduler Scheduler { get; } public LongRunning(IObservable<TSource> source, [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] Node<TSource> prepends, [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] Node<TSource> appends, IScheduler scheduler, ISchedulerLongRunning longRunningScheduler) { _source = source; _appends = appends; _prepends = prepends; Scheduler = scheduler; _longRunningScheduler = longRunningScheduler; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] public IAppendPrepend Append(TSource value) { return new LongRunning(_source, _prepends, new Node<TSource>(_appends, value), Scheduler, _longRunningScheduler); } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] public IAppendPrepend Prepend(TSource value) { return new LongRunning(_source, new Node<TSource>(_prepends, value), _appends, Scheduler, _longRunningScheduler); } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(); } } [System.Runtime.CompilerServices.Nullable(0)] private sealed class Node<[System.Runtime.CompilerServices.Nullable(2)] T> { [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] public readonly Node<T> Parent; public readonly T Value; public readonly int Count; public Node(T value) : this((Node<T>)null, value) { } public Node([System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 1 })] Node<T> parent, T value) { Parent = parent; Value = value; if (parent == null) Count = 1; else { if (parent.Count == 2147483647) throw new NotSupportedException($"""{2147483647}"""); Count = parent.Count + 1; } } public T[] ToReverseArray() { T[] array = new T[Count]; Node<T> node = this; for (int num = Count - 1; num >= 0; num--) { array[num] = node.Value; node = node.Parent; } return array; } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 0, 1, 0 })] internal sealed class SingleImmediate : SingleBase<SingleImmediate._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { private readonly IObservable<TSource> _source; private readonly TSource _value; private readonly bool _append; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] SingleImmediate parent, IObserver<TSource> observer) : base(observer) { _source = parent._source; _value = parent._value; _append = parent._append; } public void Run() { if (!_append) ForwardOnNext(_value); Run(_source); } public override void OnCompleted() { if (_append) ForwardOnNext(_value); ForwardOnCompleted(); } } public override IScheduler Scheduler => ImmediateScheduler.Instance; public SingleImmediate(IObservable<TSource> source, TSource value, bool append) : base(source, value, append) { } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(); } } } }