PipeReaderStream
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace System.IO.Pipelines
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
internal sealed class PipeReaderStream : Stream
{
private readonly PipeReader _pipeReader;
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length {
get {
throw new NotSupportedException();
}
}
public override long Position {
get {
throw new NotSupportedException();
}
set {
throw new NotSupportedException();
}
}
internal bool LeaveOpen { get; set; }
public PipeReaderStream(PipeReader pipeReader, bool leaveOpen)
{
_pipeReader = pipeReader;
LeaveOpen = leaveOpen;
}
protected override void Dispose(bool disposing)
{
if (!LeaveOpen)
_pipeReader.Complete(null);
base.Dispose(disposing);
}
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null)
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.buffer);
return ReadInternal(new Span<byte>(buffer, offset, count));
}
public unsafe override int ReadByte()
{
Span<byte> buffer = new Span<byte>(stackalloc byte[1], 1);
if (ReadInternal(buffer) != 0)
return buffer[0];
return -1;
}
private int ReadInternal(Span<byte> buffer)
{
ValueTask<ReadResult> valueTask = _pipeReader.ReadAsync(default(CancellationToken));
ReadResult result = valueTask.IsCompletedSuccessfully ? valueTask.Result : valueTask.AsTask().GetAwaiter().GetResult();
return HandleReadResult(result, buffer);
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, [System.Runtime.CompilerServices.Nullable(2)] AsyncCallback callback, [System.Runtime.CompilerServices.Nullable(2)] object state)
{
return System.Threading.Tasks.TaskToAsyncResult.Begin(ReadAsync(buffer, offset, count, default(CancellationToken)), callback, state);
}
public sealed override int EndRead(IAsyncResult asyncResult)
{
return System.Threading.Tasks.TaskToAsyncResult.End<int>(asyncResult);
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer == null)
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.buffer);
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}
[AsyncStateMachine(typeof(<ReadAsyncInternal>d__28))]
private ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
{
<ReadAsyncInternal>d__28 stateMachine = default(<ReadAsyncInternal>d__28);
stateMachine.<>t__builder = AsyncValueTaskMethodBuilder<int>.Create();
stateMachine.<>4__this = this;
stateMachine.buffer = buffer;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private int HandleReadResult(ReadResult result, Span<byte> buffer)
{
if (result.IsCanceled)
ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
ReadOnlySequence<byte> buffer2 = result.Buffer;
long length = buffer2.Length;
SequencePosition consumed = buffer2.Start;
try {
if (length != 0) {
int num = (int)Math.Min(length, buffer.Length);
ReadOnlySequence<byte> source = (num == length) ? buffer2 : buffer2.Slice(0, num);
consumed = source.End;
ref source.CopyTo(buffer);
return num;
}
if (result.IsCompleted)
return 0;
} finally {
_pipeReader.AdvanceTo(consumed);
}
ThrowHelper.ThrowInvalidOperationException_InvalidZeroByteRead();
return 0;
}
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
return _pipeReader.CopyToAsync(destination, cancellationToken);
}
}
}