<PackageReference Include="System.Reactive" Version="4.1.6" />

Buffer<TSource, TBufferClosing>

static class Buffer<TSource, TBufferClosing>
using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal static class Buffer<TSource, TBufferClosing> { internal sealed class Selector : Producer<IList<TSource>, Selector._> { internal sealed class _ : Sink<TSource, IList<TSource>> { private sealed class BufferClosingObserver : SafeObserver<TBufferClosing> { private readonly _ _parent; public BufferClosingObserver(_ parent) { _parent = parent; } public override void OnNext(TBufferClosing value) { _parent.CloseBuffer(this); } public override void OnError(Exception error) { _parent.OnError(error); } public override void OnCompleted() { _parent.CloseBuffer(this); } } private readonly object _gate = new object(); private readonly AsyncLock _bufferGate = new AsyncLock(); private IDisposable _bufferClosingSerialDisposable; private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector; private IList<TSource> _buffer; public _(Selector parent, IObserver<IList<TSource>> observer) : base(observer) { _bufferClosingSelector = parent._bufferClosingSelector; } public override void Run(IObservable<TSource> source) { _buffer = new List<TSource>(); base.Run(source); _bufferGate.Wait<_>(this, (Action<_>)delegate(_ this) { this.CreateBufferClose(); }); } protected override void Dispose(bool disposing) { if (disposing) Disposable.TryDispose(ref _bufferClosingSerialDisposable); base.Dispose(disposing); } private void CreateBufferClose() { IObservable<TBufferClosing> observable = null; try { observable = _bufferClosingSelector(); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } BufferClosingObserver bufferClosingObserver = new BufferClosingObserver(this); Disposable.TrySetSerial(ref _bufferClosingSerialDisposable, bufferClosingObserver); bufferClosingObserver.SetResource(ObservableExtensions.SubscribeSafe<TBufferClosing>(observable, (IObserver<TBufferClosing>)bufferClosingObserver)); } private void CloseBuffer(IDisposable closingSubscription) { closingSubscription.Dispose(); lock (_gate) { IList<TSource> buffer = _buffer; _buffer = new List<TSource>(); ForwardOnNext(buffer); } _bufferGate.Wait<_>(this, (Action<_>)delegate(_ this) { this.CreateBufferClose(); }); } public override void OnNext(TSource value) { lock (_gate) { _buffer.Add(value); } } public override void OnError(Exception error) { lock (_gate) { _buffer.Clear(); ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { ForwardOnNext(_buffer); ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector; public Selector(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector) { _source = source; _bufferClosingSelector = bufferClosingSelector; } protected override _ CreateSink(IObserver<IList<TSource>> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class Boundaries : Producer<IList<TSource>, Boundaries._> { internal sealed class _ : Sink<TSource, IList<TSource>> { private sealed class BufferClosingObserver : IObserver<TBufferClosing> { private readonly _ _parent; public BufferClosingObserver(_ parent) { _parent = parent; } public void OnNext(TBufferClosing value) { lock (_parent._gate) { IList<TSource> buffer = _parent._buffer; _parent._buffer = new List<TSource>(); _parent.ForwardOnNext(buffer); } } public void OnError(Exception error) { _parent.OnError(error); } public void OnCompleted() { _parent.OnCompleted(); } } private readonly object _gate = new object(); private IList<TSource> _buffer; private IDisposable _boundariesDisposable; public _(IObserver<IList<TSource>> observer) : base(observer) { } public void Run(Boundaries parent) { _buffer = new List<TSource>(); Run(parent._source); Disposable.SetSingle(ref _boundariesDisposable, ObservableExtensions.SubscribeSafe<TBufferClosing>(parent._bufferBoundaries, (IObserver<TBufferClosing>)new BufferClosingObserver(this))); } protected override void Dispose(bool disposing) { if (disposing) Disposable.TryDispose(ref _boundariesDisposable); base.Dispose(disposing); } public override void OnNext(TSource value) { lock (_gate) { _buffer.Add(value); } } public override void OnError(Exception error) { lock (_gate) { _buffer.Clear(); ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { ForwardOnNext(_buffer); ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly IObservable<TBufferClosing> _bufferBoundaries; public Boundaries(IObservable<TSource> source, IObservable<TBufferClosing> bufferBoundaries) { _source = source; _bufferBoundaries = bufferBoundaries; } protected override _ CreateSink(IObserver<IList<TSource>> observer) { return new _(observer); } protected override void Run(_ sink) { sink.Run(this); } } } }