StreamPipeWriter
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace System.IO.Pipelines
{
internal sealed class StreamPipeWriter : PipeWriter
{
internal const int InitialSegmentPoolSize = 4;
internal const int MaxSegmentPoolSize = 256;
private readonly int _minimumBufferSize;
private BufferSegment _head;
private BufferSegment _tail;
private Memory<byte> _tailMemory;
private int _tailBytesBuffered;
private int _bytesBuffered;
private readonly MemoryPool<byte> _pool;
private readonly int _maxPooledBufferSize;
private CancellationTokenSource _internalTokenSource;
private bool _isCompleted;
private readonly object _lockObject = new object();
private BufferSegmentStack _bufferSegmentPool;
private readonly bool _leaveOpen;
[Nullable(1)]
private CancellationTokenSource InternalTokenSource {
get {
lock (_lockObject) {
return _internalTokenSource ?? (_internalTokenSource = new CancellationTokenSource());
}
}
}
[Nullable(1)]
public Stream InnerStream {
[NullableContext(1)]
get;
}
public override bool CanGetUnflushedBytes => true;
public override long UnflushedBytes => _bytesBuffered;
[NullableContext(1)]
public StreamPipeWriter(Stream writingStream, StreamPipeWriterOptions options)
{
if (writingStream == null)
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.writingStream);
if (options == null)
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options);
InnerStream = writingStream;
_minimumBufferSize = options.MinimumBufferSize;
_pool = ((options.Pool == MemoryPool<byte>.Shared) ? null : options.Pool);
MemoryPool<byte> pool = _pool;
_maxPooledBufferSize = ((pool != null) ? pool.MaxBufferSize : (-1));
_bufferSegmentPool = new BufferSegmentStack(4);
_leaveOpen = options.LeaveOpen;
}
public override void Advance(int bytes)
{
if ((uint)bytes > (uint)_tailMemory.Length)
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
_tailBytesBuffered += bytes;
_bytesBuffered += bytes;
_tailMemory = _tailMemory.Slice(bytes);
}
public override Memory<byte> GetMemory(int sizeHint = 0)
{
if (_isCompleted)
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
if (sizeHint < 0)
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint);
AllocateMemory(sizeHint);
return _tailMemory;
}
public override Span<byte> GetSpan(int sizeHint = 0)
{
if (_isCompleted)
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
if (sizeHint < 0)
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint);
AllocateMemory(sizeHint);
return _tailMemory.Span;
}
private void AllocateMemory(int sizeHint)
{
if (_head == null) {
BufferSegment tail = AllocateSegment(sizeHint);
_head = (_tail = tail);
_tailBytesBuffered = 0;
} else {
int length = _tailMemory.Length;
if (length == 0 || length < sizeHint) {
if (_tailBytesBuffered > 0) {
_tail.End += _tailBytesBuffered;
_tailBytesBuffered = 0;
}
BufferSegment bufferSegment = AllocateSegment(sizeHint);
_tail.SetNext(bufferSegment);
_tail = bufferSegment;
}
}
}
private BufferSegment AllocateSegment(int sizeHint)
{
BufferSegment bufferSegment = CreateSegmentUnsynchronized();
int maxPooledBufferSize = _maxPooledBufferSize;
if (sizeHint <= maxPooledBufferSize)
bufferSegment.SetOwnedMemory(_pool.Rent(GetSegmentSize(sizeHint, maxPooledBufferSize)));
else {
int segmentSize = GetSegmentSize(sizeHint, 2147483647);
bufferSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(segmentSize));
}
_tailMemory = bufferSegment.AvailableMemory;
return bufferSegment;
}
private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)
{
sizeHint = Math.Max(_minimumBufferSize, sizeHint);
return Math.Min(maxBufferSize, sizeHint);
}
private BufferSegment CreateSegmentUnsynchronized()
{
if (_bufferSegmentPool.TryPop(out BufferSegment result))
return result;
return new BufferSegment();
}
private void ReturnSegmentUnsynchronized(BufferSegment segment)
{
segment.Reset();
if (_bufferSegmentPool.Count < 256)
_bufferSegmentPool.Push(segment);
}
public override void CancelPendingFlush()
{
Cancel();
}
[NullableContext(2)]
public override void Complete(Exception exception = null)
{
if (!_isCompleted) {
_isCompleted = true;
try {
FlushInternal(exception == null);
} finally {
_internalTokenSource?.Dispose();
if (!_leaveOpen)
InnerStream.Dispose();
}
}
}
[NullableContext(2)]
[AsyncStateMachine(typeof(<CompleteAsync>d__33))]
public override ValueTask CompleteAsync(Exception exception = null)
{
<CompleteAsync>d__33 stateMachine;
stateMachine.<>t__builder = AsyncValueTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.exception = exception;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
if (_bytesBuffered == 0)
return new ValueTask<FlushResult>(new FlushResult(false, false));
return FlushAsyncInternal(true, Memory<byte>.Empty, cancellationToken);
}
public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
return FlushAsyncInternal(true, source, cancellationToken);
}
private void Cancel()
{
InternalTokenSource.Cancel();
}
[AsyncStateMachine(typeof(<FlushAsyncInternal>d__39))]
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
private ValueTask<FlushResult> FlushAsyncInternal(bool writeToStream, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default(CancellationToken))
{
<FlushAsyncInternal>d__39 stateMachine;
stateMachine.<>t__builder = PoolingAsyncValueTaskMethodBuilder<FlushResult>.Create();
stateMachine.<>4__this = this;
stateMachine.writeToStream = writeToStream;
stateMachine.data = data;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private void FlushInternal(bool writeToStream)
{
if (_tailBytesBuffered > 0) {
_tail.End += _tailBytesBuffered;
_tailBytesBuffered = 0;
}
BufferSegment bufferSegment = _head;
while (bufferSegment != null) {
BufferSegment bufferSegment2 = bufferSegment;
bufferSegment = bufferSegment.NextSegment;
if ((bufferSegment2.Length > 0) & writeToStream)
InnerStream.Write(bufferSegment2.Memory.Span);
ReturnSegmentUnsynchronized(bufferSegment2);
_head = bufferSegment;
}
if ((_bytesBuffered > 0) & writeToStream)
InnerStream.Flush();
_head = null;
_tail = null;
_tailMemory = default(Memory<byte>);
_bytesBuffered = 0;
}
}
}