<PackageReference Include="SSH.NET" Version="2025.1.0" />

PipeStream

public class PipeStream : Stream
PipeStream is a thread-safe read/write data stream for use between two threads in a single-producer/single-consumer type problem.
using System; using System.IO; using System.Net; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace Renci.SshNet.Common { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] public class PipeStream : Stream { private readonly object _sync = new object(); private System.Net.ArrayBuffer _buffer = new System.Net.ArrayBuffer(1024, false); private bool _disposed; public override bool CanRead => true; public override bool CanSeek => false; public override bool CanWrite => !_disposed; public override long Length { get { lock (_sync) { return _buffer.ActiveLength; } } } public override long Position { get { return 0; } set { throw new NotSupportedException(); } } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } public override int Read(byte[] buffer, int offset, int count) { ThrowHelper.ValidateBufferArguments(buffer, offset, count); return Read(buffer.AsSpan(offset, count)); } [System.Runtime.CompilerServices.NullableContext(0)] private new int Read(Span<byte> buffer) { lock (_sync) { while (_buffer.ActiveLength == 0 && !_disposed) { Monitor.Wait(_sync); } int num = Math.Min(buffer.Length, _buffer.ActiveLength); ReadOnlySpan<byte> readOnlySpan = _buffer.ActiveReadOnlySpan; readOnlySpan = readOnlySpan.Slice(0, num); readOnlySpan.CopyTo(buffer); _buffer.Discard(num); return num; } } public override void Write(byte[] buffer, int offset, int count) { ThrowHelper.ValidateBufferArguments(buffer, offset, count); lock (_sync) { WriteCore(buffer.AsSpan(offset, count)); } } public override void WriteByte(byte value) { lock (_sync) { WriteCore(new ReadOnlySpan<byte>(new byte[1] { value })); } } [System.Runtime.CompilerServices.NullableContext(0)] private void WriteCore(ReadOnlySpan<byte> buffer) { ThrowHelper.ThrowObjectDisposedIf(_disposed, this); _buffer.EnsureAvailableSpace(buffer.Length); buffer.CopyTo(_buffer.AvailableSpan); _buffer.Commit(buffer.Length); Monitor.PulseAll(_sync); } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { ThrowHelper.ValidateBufferArguments(buffer, offset, count); return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); } [System.Runtime.CompilerServices.NullableContext(0)] [AsyncStateMachine(typeof(<WriteAsync>d__11))] private new ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken) { <WriteAsync>d__11 stateMachine = default(<WriteAsync>d__11); stateMachine.<>t__builder = AsyncValueTaskMethodBuilder.Create(); stateMachine.<>4__this = this; stateMachine.buffer = buffer; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, [System.Runtime.CompilerServices.Nullable(2)] AsyncCallback callback, [System.Runtime.CompilerServices.Nullable(2)] object state) { return System.Threading.Tasks.TaskToAsyncResult.Begin(WriteAsync(buffer, offset, count), callback, state); } public override void EndWrite(IAsyncResult asyncResult) { System.Threading.Tasks.TaskToAsyncResult.End(asyncResult); } public override void Flush() { } public override Task FlushAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } protected override void Dispose(bool disposing) { if (!disposing) base.Dispose(disposing); else { lock (_sync) { if (_disposed) return; _disposed = true; Monitor.PulseAll(_sync); } base.Dispose(disposing); } } } }