<PackageReference Include="System.Reactive" Version="6.1.0" />

Buffer<TSource, TBufferClosing>

static class Buffer<TSource, TBufferClosing>
using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Runtime.CompilerServices; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal static class Buffer<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TBufferClosing> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 1, 0, 0 })] internal sealed class Selector : Producer<IList<TSource>, Selector._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 1 })] internal sealed class _ : Sink<TSource, IList<TSource>> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] private sealed class BufferClosingObserver : SafeObserver<TBufferClosing> { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] private readonly _ _parent; public BufferClosingObserver([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ parent) { _parent = parent; } public override void OnNext(TBufferClosing value) { _parent.CloseBuffer(this); } public override void OnError(Exception error) { _parent.OnError(error); } public override void OnCompleted() { _parent.CloseBuffer(this); } } private readonly object _gate = new object(); private readonly AsyncLock _bufferGate = new AsyncLock(); private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector; private List<TSource> _buffer = new List<TSource>(); private SerialDisposableValue _bufferClosingSerialDisposable; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] Selector parent, IObserver<IList<TSource>> observer) : base(observer) { _bufferClosingSelector = parent._bufferClosingSelector; } public override void Run(IObservable<TSource> source) { base.Run(source); _bufferGate.Wait<_>(this, (Action<_>)delegate(_ this) { this.CreateBufferClose(); }); } protected override void Dispose(bool disposing) { if (disposing) _bufferClosingSerialDisposable.Dispose(); base.Dispose(disposing); } private void CreateBufferClose() { IObservable<TBufferClosing> source; try { source = _bufferClosingSelector(); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } BufferClosingObserver bufferClosingObserver = new BufferClosingObserver(this); _bufferClosingSerialDisposable.Disposable = bufferClosingObserver; bufferClosingObserver.SetResource(ObservableExtensions.SubscribeSafe<TBufferClosing>(source, (IObserver<TBufferClosing>)bufferClosingObserver)); } private void CloseBuffer(IDisposable closingSubscription) { closingSubscription.Dispose(); lock (_gate) { List<TSource> buffer = _buffer; _buffer = new List<TSource>(); ForwardOnNext(buffer); } _bufferGate.Wait<_>(this, (Action<_>)delegate(_ this) { this.CreateBufferClose(); }); } public override void OnNext(TSource value) { lock (_gate) { _buffer.Add(value); } } public override void OnError(Exception error) { lock (_gate) { _buffer.Clear(); ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { ForwardOnNext(_buffer); ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector; public Selector(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector) { _source = source; _bufferClosingSelector = bufferClosingSelector; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] protected override _ CreateSink(IObserver<IList<TSource>> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ sink) { sink.Run(_source); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 1, 0, 0 })] internal sealed class Boundaries : Producer<IList<TSource>, Boundaries._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 1 })] internal sealed class _ : Sink<TSource, IList<TSource>> { [System.Runtime.CompilerServices.Nullable(0)] private sealed class BufferClosingObserver : IObserver<TBufferClosing> { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] private readonly _ _parent; public BufferClosingObserver([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ parent) { _parent = parent; } public void OnNext(TBufferClosing value) { lock (_parent._gate) { List<TSource> buffer = _parent._buffer; _parent._buffer = new List<TSource>(); _parent.ForwardOnNext(buffer); } } public void OnError(Exception error) { _parent.OnError(error); } public void OnCompleted() { _parent.OnCompleted(); } } private readonly object _gate = new object(); private List<TSource> _buffer = new List<TSource>(); private SingleAssignmentDisposableValue _boundariesDisposable; public _(IObserver<IList<TSource>> observer) : base(observer) { } public void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] Boundaries parent) { Run(parent._source); _boundariesDisposable.Disposable = ObservableExtensions.SubscribeSafe<TBufferClosing>(parent._bufferBoundaries, (IObserver<TBufferClosing>)new BufferClosingObserver(this)); } protected override void Dispose(bool disposing) { if (disposing) _boundariesDisposable.Dispose(); base.Dispose(disposing); } public override void OnNext(TSource value) { lock (_gate) { _buffer.Add(value); } } public override void OnError(Exception error) { lock (_gate) { _buffer.Clear(); ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { ForwardOnNext(_buffer); ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly IObservable<TBufferClosing> _bufferBoundaries; public Boundaries(IObservable<TSource> source, IObservable<TBufferClosing> bufferBoundaries) { _source = source; _bufferBoundaries = bufferBoundaries; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] protected override _ CreateSink(IObserver<IList<TSource>> observer) { return new _(observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ sink) { sink.Run(this); } } } }