PipeStream
PipeStream is a thread-safe read/write data stream for use between two threads in a
single-producer/single-consumer type problem.
using System;
using System.IO;
using System.Net;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Renci.SshNet.Common
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
public class PipeStream : Stream
{
private readonly object _sync = new object();
private System.Net.ArrayBuffer _buffer = new System.Net.ArrayBuffer(1024, false);
private bool _disposed;
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => !_disposed;
public override long Length {
get {
lock (_sync) {
return _buffer.ActiveLength;
}
}
}
public override long Position {
get {
return 0;
}
set {
throw new NotSupportedException();
}
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
ThrowHelper.ValidateBufferArguments(buffer, offset, count);
return Read(buffer.AsSpan(offset, count));
}
[System.Runtime.CompilerServices.NullableContext(0)]
private new int Read(Span<byte> buffer)
{
lock (_sync) {
while (_buffer.ActiveLength == 0 && !_disposed) {
Monitor.Wait(_sync);
}
int num = Math.Min(buffer.Length, _buffer.ActiveLength);
ReadOnlySpan<byte> readOnlySpan = _buffer.ActiveReadOnlySpan;
readOnlySpan = readOnlySpan.Slice(0, num);
readOnlySpan.CopyTo(buffer);
_buffer.Discard(num);
return num;
}
}
public override void Write(byte[] buffer, int offset, int count)
{
ThrowHelper.ValidateBufferArguments(buffer, offset, count);
lock (_sync) {
WriteCore(buffer.AsSpan(offset, count));
}
}
public override void WriteByte(byte value)
{
lock (_sync) {
WriteCore(new ReadOnlySpan<byte>(new byte[1] {
value
}));
}
}
[System.Runtime.CompilerServices.NullableContext(0)]
private void WriteCore(ReadOnlySpan<byte> buffer)
{
ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
_buffer.EnsureAvailableSpace(buffer.Length);
buffer.CopyTo(_buffer.AvailableSpan);
_buffer.Commit(buffer.Length);
Monitor.PulseAll(_sync);
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ThrowHelper.ValidateBufferArguments(buffer, offset, count);
return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
}
[System.Runtime.CompilerServices.NullableContext(0)]
[AsyncStateMachine(typeof(<WriteAsync>d__11))]
private new ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
<WriteAsync>d__11 stateMachine = default(<WriteAsync>d__11);
stateMachine.<>t__builder = AsyncValueTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.buffer = buffer;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, [System.Runtime.CompilerServices.Nullable(2)] AsyncCallback callback, [System.Runtime.CompilerServices.Nullable(2)] object state)
{
return System.Threading.Tasks.TaskToAsyncResult.Begin(WriteAsync(buffer, offset, count), callback, state);
}
public override void EndWrite(IAsyncResult asyncResult)
{
System.Threading.Tasks.TaskToAsyncResult.End(asyncResult);
}
public override void Flush()
{
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
protected override void Dispose(bool disposing)
{
if (!disposing)
base.Dispose(disposing);
else {
lock (_sync) {
if (_disposed)
return;
_disposed = true;
Monitor.PulseAll(_sync);
}
base.Dispose(disposing);
}
}
}
}