Pipe
The default PipeWriter and PipeReader implementation.
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
namespace System.IO.Pipelines
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
public sealed class Pipe
{
private sealed class DefaultPipeReader : PipeReader, IValueTaskSource<ReadResult>
{
private readonly Pipe _pipe;
public DefaultPipeReader(Pipe pipe)
{
_pipe = pipe;
}
public override bool TryRead(out ReadResult result)
{
return _pipe.TryRead(out result);
}
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return _pipe.ReadAsync(cancellationToken);
}
protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumBytes, CancellationToken cancellationToken)
{
return _pipe.ReadAtLeastAsync(minimumBytes, cancellationToken);
}
public override void AdvanceTo(SequencePosition consumed)
{
_pipe.AdvanceReader(ref consumed);
}
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
_pipe.AdvanceReader(ref consumed, ref examined);
}
public override void CancelPendingRead()
{
_pipe.CancelPendingRead();
}
public override void Complete(Exception exception = null)
{
_pipe.CompleteReader(exception);
}
public override void OnWriterCompleted(Action<Exception, object> callback, object state)
{
_pipe.OnWriterCompleted(callback, state);
}
public ValueTaskSourceStatus GetStatus(short token)
{
return _pipe.GetReadAsyncStatus();
}
public ReadResult GetResult(short token)
{
return _pipe.GetReadAsyncResult();
}
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
_pipe.OnReadAsyncCompleted(continuation, state, flags);
}
}
private sealed class DefaultPipeWriter : PipeWriter, IValueTaskSource<FlushResult>
{
private readonly Pipe _pipe;
public override bool CanGetUnflushedBytes => true;
public override long UnflushedBytes => _pipe.GetUnflushedBytes();
public DefaultPipeWriter(Pipe pipe)
{
_pipe = pipe;
}
public override void Complete(Exception exception = null)
{
_pipe.CompleteWriter(exception);
}
public override void CancelPendingFlush()
{
_pipe.CancelPendingFlush();
}
public override void OnReaderCompleted(Action<Exception, object> callback, object state)
{
_pipe.OnReaderCompleted(callback, state);
}
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return _pipe.FlushAsync(cancellationToken);
}
public override void Advance(int bytes)
{
_pipe.Advance(bytes);
}
public override Memory<byte> GetMemory(int sizeHint = 0)
{
return _pipe.GetMemory(sizeHint);
}
public override Span<byte> GetSpan(int sizeHint = 0)
{
return _pipe.GetSpan(sizeHint);
}
public ValueTaskSourceStatus GetStatus(short token)
{
return _pipe.GetFlushAsyncStatus();
}
public FlushResult GetResult(short token)
{
return _pipe.GetFlushAsyncResult();
}
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
_pipe.OnFlushAsyncCompleted(continuation, state, flags);
}
public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
return _pipe.WriteAsync(source, cancellationToken);
}
}
private static readonly Action<object> s_signalReaderAwaitable = delegate(object state) {
((Pipe)state).ReaderCancellationRequested();
};
private static readonly Action<object> s_signalWriterAwaitable = delegate(object state) {
((Pipe)state).WriterCancellationRequested();
};
private static readonly Action<object> s_invokeCompletionCallbacks = delegate(object state) {
((PipeCompletionCallbacks)state).Execute();
};
private static readonly ContextCallback s_executionContextRawCallback = ExecuteWithoutExecutionContext;
private static readonly SendOrPostCallback s_syncContextExecutionContextCallback = ExecuteWithExecutionContext;
private static readonly SendOrPostCallback s_syncContextExecuteWithoutExecutionContextCallback = ExecuteWithoutExecutionContext;
private static readonly Action<object> s_scheduleWithExecutionContextCallback = ExecuteWithExecutionContext;
private BufferSegmentStack _bufferSegmentPool;
private readonly DefaultPipeReader _reader;
private readonly DefaultPipeWriter _writer;
private readonly PipeOptions _options;
private readonly object _sync = new object();
private long _unconsumedBytes;
private long _unflushedBytes;
private PipeAwaitable _readerAwaitable;
private PipeAwaitable _writerAwaitable;
private PipeCompletion _writerCompletion;
private PipeCompletion _readerCompletion;
private long _lastExaminedIndex = -1;
private BufferSegment _readHead;
private int _readHeadIndex;
private bool _disposed;
private BufferSegment _readTail;
private int _readTailIndex;
private int _minimumReadBytes;
private BufferSegment _writingHead;
private Memory<byte> _writingHeadMemory;
private int _writingHeadBytesBuffered;
private PipeOperationState _operationState;
private bool UseSynchronizationContext => _options.UseSynchronizationContext;
private int MinimumSegmentSize => _options.MinimumSegmentSize;
private long PauseWriterThreshold => _options.PauseWriterThreshold;
private long ResumeWriterThreshold => _options.ResumeWriterThreshold;
private PipeScheduler ReaderScheduler => _options.ReaderScheduler;
private PipeScheduler WriterScheduler => _options.WriterScheduler;
private object SyncObj => _sync;
internal long Length => _unconsumedBytes;
public PipeReader Reader => _reader;
public PipeWriter Writer => _writer;
public Pipe()
: this(PipeOptions.Default)
{
}
public Pipe(PipeOptions options)
{
if (options == null)
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options);
_bufferSegmentPool = new BufferSegmentStack(options.InitialSegmentPoolSize);
_operationState = default(PipeOperationState);
_readerCompletion = default(PipeCompletion);
_writerCompletion = default(PipeCompletion);
_options = options;
_readerAwaitable = new PipeAwaitable(false, UseSynchronizationContext);
_writerAwaitable = new PipeAwaitable(true, UseSynchronizationContext);
_reader = new DefaultPipeReader(this);
_writer = new DefaultPipeWriter(this);
}
private void ResetState()
{
_readerCompletion.Reset();
_writerCompletion.Reset();
_readerAwaitable = new PipeAwaitable(false, UseSynchronizationContext);
_writerAwaitable = new PipeAwaitable(true, UseSynchronizationContext);
_readTailIndex = 0;
_readHeadIndex = 0;
_lastExaminedIndex = -1;
_unflushedBytes = 0;
_unconsumedBytes = 0;
}
[System.Runtime.CompilerServices.NullableContext(0)]
internal Memory<byte> GetMemory(int sizeHint)
{
if (_writerCompletion.IsCompleted)
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
if (sizeHint < 0)
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint);
AllocateWriteHeadIfNeeded(sizeHint);
return _writingHeadMemory;
}
[System.Runtime.CompilerServices.NullableContext(0)]
internal Span<byte> GetSpan(int sizeHint)
{
if (_writerCompletion.IsCompleted)
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
if (sizeHint < 0)
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint);
AllocateWriteHeadIfNeeded(sizeHint);
return _writingHeadMemory.Span;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void AllocateWriteHeadIfNeeded(int sizeHint)
{
if (!_operationState.IsWritingActive || _writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint)
AllocateWriteHeadSynchronized(sizeHint);
}
private void AllocateWriteHeadSynchronized(int sizeHint)
{
lock (SyncObj) {
_operationState.BeginWrite();
if (_writingHead == null) {
BufferSegment readTail = AllocateSegment(sizeHint);
_writingHead = (_readHead = (_readTail = readTail));
_lastExaminedIndex = 0;
} else {
int length = _writingHeadMemory.Length;
if (length == 0 || length < sizeHint) {
if (_writingHeadBytesBuffered > 0) {
_writingHead.End += _writingHeadBytesBuffered;
_writingHeadBytesBuffered = 0;
}
if (_writingHead.Length == 0) {
_writingHead.ResetMemory();
RentMemory(_writingHead, sizeHint);
} else {
BufferSegment bufferSegment = AllocateSegment(sizeHint);
_writingHead.SetNext(bufferSegment);
_writingHead = bufferSegment;
}
}
}
}
}
private BufferSegment AllocateSegment(int sizeHint)
{
BufferSegment bufferSegment = CreateSegmentUnsynchronized();
RentMemory(bufferSegment, sizeHint);
return bufferSegment;
}
private void RentMemory(BufferSegment segment, int sizeHint)
{
MemoryPool<byte> memoryPool = null;
int num = -1;
if (!_options.IsDefaultSharedMemoryPool) {
memoryPool = _options.Pool;
num = memoryPool.MaxBufferSize;
}
if (sizeHint <= num)
segment.SetOwnedMemory(memoryPool.Rent(GetSegmentSize(sizeHint, num)));
else {
int segmentSize = GetSegmentSize(sizeHint, 2147483647);
segment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(segmentSize));
}
_writingHeadMemory = segment.AvailableMemory;
}
private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)
{
sizeHint = Math.Max(MinimumSegmentSize, sizeHint);
return Math.Min(maxBufferSize, sizeHint);
}
private BufferSegment CreateSegmentUnsynchronized()
{
if (_bufferSegmentPool.TryPop(out BufferSegment result))
return result;
return new BufferSegment();
}
private void ReturnSegmentUnsynchronized(BufferSegment segment)
{
if (_bufferSegmentPool.Count < _options.MaxSegmentPoolSize)
_bufferSegmentPool.Push(segment);
}
internal bool CommitUnsynchronized()
{
_operationState.EndWrite();
if (_unflushedBytes == 0)
return false;
_writingHead.End += _writingHeadBytesBuffered;
_readTail = _writingHead;
_readTailIndex = _writingHead.End;
long unconsumedBytes = _unconsumedBytes;
_unconsumedBytes += _unflushedBytes;
bool result = true;
if (_unconsumedBytes < _minimumReadBytes)
result = false;
else if (PauseWriterThreshold > 0 && unconsumedBytes < PauseWriterThreshold && _unconsumedBytes >= PauseWriterThreshold && !_readerCompletion.IsCompleted) {
_writerAwaitable.SetUncompleted();
}
_unflushedBytes = 0;
_writingHeadBytesBuffered = 0;
return result;
}
internal void Advance(int bytes)
{
lock (SyncObj) {
if ((uint)bytes > (uint)_writingHeadMemory.Length)
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
if (!_readerCompletion.IsCompleted)
AdvanceCore(bytes);
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void AdvanceCore(int bytesWritten)
{
_unflushedBytes += bytesWritten;
_writingHeadBytesBuffered += bytesWritten;
_writingHeadMemory = _writingHeadMemory.Slice(bytesWritten);
}
[System.Runtime.CompilerServices.NullableContext(0)]
internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
CompletionData completionData;
ValueTask<FlushResult> result;
lock (SyncObj) {
PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}
TrySchedule(ReaderScheduler, ref completionData);
return result;
}
private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
{
bool num = CommitUnsynchronized();
_writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this);
if (_writerAwaitable.IsCompleted) {
FlushResult result2 = default(FlushResult);
GetFlushResult(ref result2);
result = new ValueTask<FlushResult>(result2);
} else
result = new ValueTask<FlushResult>(_writer, 0);
if (num)
_readerAwaitable.Complete(out completionData);
else
completionData = default(CompletionData);
}
[System.Runtime.CompilerServices.NullableContext(2)]
internal void CompleteWriter(Exception exception)
{
PipeCompletionCallbacks pipeCompletionCallbacks = default(PipeCompletionCallbacks);
CompletionData completionData;
bool isCompleted = default(bool);
lock (SyncObj) {
CommitUnsynchronized();
pipeCompletionCallbacks = _writerCompletion.TryComplete(exception);
_readerAwaitable.Complete(out completionData);
isCompleted = _readerCompletion.IsCompleted;
}
if (isCompleted)
CompletePipe();
if (pipeCompletionCallbacks != null)
ScheduleCallbacks(ReaderScheduler, pipeCompletionCallbacks);
TrySchedule(ReaderScheduler, ref completionData);
}
internal void AdvanceReader([In] [System.Runtime.CompilerServices.IsReadOnly] ref SequencePosition consumed)
{
AdvanceReader(ref consumed, ref consumed);
}
internal void AdvanceReader([In] [System.Runtime.CompilerServices.IsReadOnly] ref SequencePosition consumed, [In] [System.Runtime.CompilerServices.IsReadOnly] ref SequencePosition examined)
{
if (_readerCompletion.IsCompleted)
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
AdvanceReader((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger());
}
private void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
{
<>c__DisplayClass67_0 <>c__DisplayClass67_ = default(<>c__DisplayClass67_0);
<>c__DisplayClass67_.<>4__this = this;
if (consumedSegment != null && examinedSegment != null && BufferSegment.GetLength(consumedSegment, consumedIndex, examinedSegment, examinedIndex) < 0)
ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition();
BufferSegment bufferSegment = null;
<>c__DisplayClass67_.returnEnd = null;
CompletionData completionData = default(CompletionData);
lock (SyncObj) {
bool flag = false;
if (examinedSegment == _readTail)
flag = (examinedIndex == _readTailIndex);
if (examinedSegment != null && _lastExaminedIndex >= 0) {
long length = BufferSegment.GetLength(_lastExaminedIndex, examinedSegment, examinedIndex);
long unconsumedBytes = _unconsumedBytes;
if (length < 0)
ThrowHelper.ThrowInvalidOperationException_InvalidExaminedPosition();
_unconsumedBytes -= length;
_lastExaminedIndex = examinedSegment.RunningIndex + examinedIndex;
if (unconsumedBytes >= ResumeWriterThreshold && _unconsumedBytes < ResumeWriterThreshold)
_writerAwaitable.Complete(out completionData);
}
if (consumedSegment != null) {
if (_readHead == null) {
ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor();
return;
}
bufferSegment = _readHead;
<>c__DisplayClass67_.returnEnd = consumedSegment;
if (consumedIndex == <>c__DisplayClass67_.returnEnd.Length) {
if (_writingHead != <>c__DisplayClass67_.returnEnd)
<AdvanceReader>g__MoveReturnEndToNextBlock|67_0(ref <>c__DisplayClass67_);
else if (_writingHeadBytesBuffered == 0 && !_operationState.IsWritingActive) {
_writingHead = null;
_writingHeadMemory = default(Memory<byte>);
<AdvanceReader>g__MoveReturnEndToNextBlock|67_0(ref <>c__DisplayClass67_);
} else {
_readHead = consumedSegment;
_readHeadIndex = consumedIndex;
}
} else {
_readHead = consumedSegment;
_readHeadIndex = consumedIndex;
}
}
if (flag && !_writerCompletion.IsCompleted)
_readerAwaitable.SetUncompleted();
while (bufferSegment != null && bufferSegment != <>c__DisplayClass67_.returnEnd) {
BufferSegment nextSegment = bufferSegment.NextSegment;
bufferSegment.Reset();
ReturnSegmentUnsynchronized(bufferSegment);
bufferSegment = nextSegment;
}
_operationState.EndRead();
}
TrySchedule(WriterScheduler, ref completionData);
}
[System.Runtime.CompilerServices.NullableContext(2)]
internal void CompleteReader(Exception exception)
{
PipeCompletionCallbacks pipeCompletionCallbacks = default(PipeCompletionCallbacks);
CompletionData completionData;
bool isCompleted = default(bool);
lock (SyncObj) {
if (_operationState.IsReadingActive)
_operationState.EndRead();
pipeCompletionCallbacks = _readerCompletion.TryComplete(exception);
_writerAwaitable.Complete(out completionData);
isCompleted = _writerCompletion.IsCompleted;
}
if (isCompleted)
CompletePipe();
if (pipeCompletionCallbacks != null)
ScheduleCallbacks(WriterScheduler, pipeCompletionCallbacks);
TrySchedule(WriterScheduler, ref completionData);
}
[System.Runtime.CompilerServices.NullableContext(2)]
internal void OnWriterCompleted([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
2,
2
})] Action<Exception, object> callback, object state)
{
if (callback == null)
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback);
PipeCompletionCallbacks pipeCompletionCallbacks = default(PipeCompletionCallbacks);
lock (SyncObj) {
pipeCompletionCallbacks = _writerCompletion.AddCallback(callback, state);
}
if (pipeCompletionCallbacks != null)
ScheduleCallbacks(ReaderScheduler, pipeCompletionCallbacks);
}
internal void CancelPendingRead()
{
CompletionData completionData;
lock (SyncObj) {
_readerAwaitable.Cancel(out completionData);
}
TrySchedule(ReaderScheduler, ref completionData);
}
internal void CancelPendingFlush()
{
CompletionData completionData;
lock (SyncObj) {
_writerAwaitable.Cancel(out completionData);
}
TrySchedule(WriterScheduler, ref completionData);
}
[System.Runtime.CompilerServices.NullableContext(2)]
internal void OnReaderCompleted([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
2,
2
})] Action<Exception, object> callback, object state)
{
if (callback == null)
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback);
PipeCompletionCallbacks pipeCompletionCallbacks = default(PipeCompletionCallbacks);
lock (SyncObj) {
pipeCompletionCallbacks = _readerCompletion.AddCallback(callback, state);
}
if (pipeCompletionCallbacks != null)
ScheduleCallbacks(WriterScheduler, pipeCompletionCallbacks);
}
[System.Runtime.CompilerServices.NullableContext(0)]
internal ValueTask<ReadResult> ReadAtLeastAsync(int minimumBytes, CancellationToken token)
{
if (_readerCompletion.IsCompleted)
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
if (token.IsCancellationRequested)
return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(token));
CompletionData completionData = default(CompletionData);
ValueTask<ReadResult> result2 = default(ValueTask<ReadResult>);
lock (SyncObj) {
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
if (_readerAwaitable.IsCompleted) {
GetReadResult(out ReadResult result);
if (_unconsumedBytes >= minimumBytes || result.IsCanceled || result.IsCompleted)
return new ValueTask<ReadResult>(result);
_readerAwaitable.SetUncompleted();
_operationState.EndRead();
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
}
if (!_writerAwaitable.IsCompleted)
_writerAwaitable.Complete(out completionData);
_minimumReadBytes = minimumBytes;
result2 = new ValueTask<ReadResult>(_reader, 0);
}
TrySchedule(WriterScheduler, ref completionData);
return result2;
}
[System.Runtime.CompilerServices.NullableContext(0)]
internal ValueTask<ReadResult> ReadAsync(CancellationToken token)
{
if (_readerCompletion.IsCompleted)
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
if (token.IsCancellationRequested)
return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(token));
lock (SyncObj) {
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
if (!_readerAwaitable.IsCompleted)
return new ValueTask<ReadResult>(_reader, 0);
GetReadResult(out ReadResult result);
return new ValueTask<ReadResult>(result);
}
}
internal bool TryRead(out ReadResult result)
{
lock (SyncObj) {
if (_readerCompletion.IsCompleted)
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
if (_unconsumedBytes <= 0 && !_readerAwaitable.IsCompleted) {
if (_readerAwaitable.IsRunning)
ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
_operationState.BeginReadTentative();
result = default(ReadResult);
return false;
}
GetReadResult(out result);
return true;
}
}
private static void ScheduleCallbacks(PipeScheduler scheduler, PipeCompletionCallbacks completionCallbacks)
{
scheduler.UnsafeSchedule(s_invokeCompletionCallbacks, completionCallbacks);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void TrySchedule(PipeScheduler scheduler, [In] [System.Runtime.CompilerServices.IsReadOnly] ref CompletionData completionData)
{
Action<object> completion = completionData.Completion;
if (completion != null) {
if (completionData.SynchronizationContext == null && completionData.ExecutionContext == null)
scheduler.UnsafeSchedule(completion, completionData.CompletionState);
else
ScheduleWithContext(scheduler, ref completionData);
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
private static void ScheduleWithContext(PipeScheduler scheduler, [In] [System.Runtime.CompilerServices.IsReadOnly] ref CompletionData completionData)
{
if (completionData.SynchronizationContext == null)
scheduler.UnsafeSchedule(s_scheduleWithExecutionContextCallback, completionData);
else if (completionData.ExecutionContext == null) {
completionData.SynchronizationContext.Post(s_syncContextExecuteWithoutExecutionContextCallback, completionData);
} else {
completionData.SynchronizationContext.Post(s_syncContextExecutionContextCallback, completionData);
}
}
private static void ExecuteWithoutExecutionContext(object state)
{
CompletionData completionData = (CompletionData)state;
completionData.Completion(completionData.CompletionState);
}
private static void ExecuteWithExecutionContext(object state)
{
ExecutionContext.Run(((CompletionData)state).ExecutionContext, s_executionContextRawCallback, state);
}
private void CompletePipe()
{
lock (SyncObj) {
if (!_disposed) {
_disposed = true;
BufferSegment bufferSegment = _readHead ?? _readTail;
while (bufferSegment != null) {
BufferSegment bufferSegment2 = bufferSegment;
bufferSegment = bufferSegment.NextSegment;
bufferSegment2.Reset();
}
_writingHead = null;
_writingHeadMemory = default(Memory<byte>);
_readHead = null;
_readTail = null;
_lastExaminedIndex = -1;
}
}
}
internal ValueTaskSourceStatus GetReadAsyncStatus()
{
if (_readerAwaitable.IsCompleted) {
if (_writerCompletion.IsFaulted)
return ValueTaskSourceStatus.Faulted;
return ValueTaskSourceStatus.Succeeded;
}
return ValueTaskSourceStatus.Pending;
}
[System.Runtime.CompilerServices.NullableContext(2)]
internal void OnReadAsyncCompleted([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
2
})] Action<object> continuation, object state, ValueTaskSourceOnCompletedFlags flags)
{
CompletionData completionData;
bool doubleCompletion;
lock (SyncObj) {
_readerAwaitable.OnCompleted(continuation, state, flags, out completionData, out doubleCompletion);
}
if (doubleCompletion)
Writer.Complete(ThrowHelper.CreateInvalidOperationException_NoConcurrentOperation());
TrySchedule(ReaderScheduler, ref completionData);
}
internal ReadResult GetReadAsyncResult()
{
CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration);
CancellationToken cancellationToken = default(CancellationToken);
ReadResult result;
try {
lock (SyncObj) {
if (!_readerAwaitable.IsCompleted)
ThrowHelper.ThrowInvalidOperationException_GetResultNotCompleted();
cancellationTokenRegistration = _readerAwaitable.ReleaseCancellationTokenRegistration(out cancellationToken);
GetReadResult(out result);
}
} finally {
cancellationTokenRegistration.Dispose();
}
if (result.IsCanceled)
cancellationToken.ThrowIfCancellationRequested();
return result;
}
private void GetReadResult(out ReadResult result)
{
bool isCompleted = _writerCompletion.IsCompletedOrThrow();
bool flag = _readerAwaitable.ObserveCancellation();
BufferSegment readHead = _readHead;
if (readHead != null) {
ReadOnlySequence<byte> buffer = new ReadOnlySequence<byte>(readHead, _readHeadIndex, _readTail, _readTailIndex);
result = new ReadResult(buffer, flag, isCompleted);
} else
result = new ReadResult(default(ReadOnlySequence<byte>), flag, isCompleted);
if (flag)
_operationState.BeginReadTentative();
else
_operationState.BeginRead();
_minimumReadBytes = 0;
}
internal ValueTaskSourceStatus GetFlushAsyncStatus()
{
if (_writerAwaitable.IsCompleted) {
if (_readerCompletion.IsFaulted)
return ValueTaskSourceStatus.Faulted;
return ValueTaskSourceStatus.Succeeded;
}
return ValueTaskSourceStatus.Pending;
}
internal FlushResult GetFlushAsyncResult()
{
FlushResult result = default(FlushResult);
CancellationToken cancellationToken = default(CancellationToken);
CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration);
try {
lock (SyncObj) {
if (!_writerAwaitable.IsCompleted)
ThrowHelper.ThrowInvalidOperationException_GetResultNotCompleted();
GetFlushResult(ref result);
cancellationTokenRegistration = _writerAwaitable.ReleaseCancellationTokenRegistration(out cancellationToken);
return result;
}
} finally {
cancellationTokenRegistration.Dispose();
cancellationToken.ThrowIfCancellationRequested();
}
}
internal long GetUnflushedBytes()
{
return _unflushedBytes;
}
private void GetFlushResult(ref FlushResult result)
{
if (_writerAwaitable.ObserveCancellation())
result._resultFlags |= ResultFlags.Canceled;
if (_readerCompletion.IsCompletedOrThrow())
result._resultFlags |= ResultFlags.Completed;
}
[System.Runtime.CompilerServices.NullableContext(0)]
internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
if (_writerCompletion.IsCompleted)
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
if (_readerCompletion.IsCompletedOrThrow())
return new ValueTask<FlushResult>(new FlushResult(false, true));
if (cancellationToken.IsCancellationRequested)
return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
CompletionData completionData;
ValueTask<FlushResult> result;
lock (SyncObj) {
AllocateWriteHeadIfNeeded(0);
if (source.Length <= _writingHeadMemory.Length) {
source.CopyTo(_writingHeadMemory);
AdvanceCore(source.Length);
} else
WriteMultiSegment(source.Span);
PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}
TrySchedule(ReaderScheduler, ref completionData);
return result;
}
private void WriteMultiSegment(ReadOnlySpan<byte> source)
{
Span<byte> span = _writingHeadMemory.Span;
while (true) {
int num = Math.Min(span.Length, source.Length);
source.Slice(0, num).CopyTo(span);
source = source.Slice(num);
AdvanceCore(num);
if (source.Length == 0)
break;
_writingHead.End += _writingHeadBytesBuffered;
_writingHeadBytesBuffered = 0;
BufferSegment bufferSegment = AllocateSegment(0);
_writingHead.SetNext(bufferSegment);
_writingHead = bufferSegment;
span = _writingHeadMemory.Span;
}
}
[System.Runtime.CompilerServices.NullableContext(2)]
internal void OnFlushAsyncCompleted([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
2
})] Action<object> continuation, object state, ValueTaskSourceOnCompletedFlags flags)
{
CompletionData completionData;
bool doubleCompletion;
lock (SyncObj) {
_writerAwaitable.OnCompleted(continuation, state, flags, out completionData, out doubleCompletion);
}
if (doubleCompletion)
Reader.Complete(ThrowHelper.CreateInvalidOperationException_NoConcurrentOperation());
TrySchedule(WriterScheduler, ref completionData);
}
private void ReaderCancellationRequested()
{
CompletionData completionData;
lock (SyncObj) {
_readerAwaitable.CancellationTokenFired(out completionData);
}
TrySchedule(ReaderScheduler, ref completionData);
}
private void WriterCancellationRequested()
{
CompletionData completionData;
lock (SyncObj) {
_writerAwaitable.CancellationTokenFired(out completionData);
}
TrySchedule(WriterScheduler, ref completionData);
}
public void Reset()
{
lock (SyncObj) {
if (!_disposed)
ThrowHelper.ThrowInvalidOperationException_ResetIncompleteReaderWriter();
_disposed = false;
ResetState();
}
}
}
}