<PackageReference Include="SSH.NET" Version="2023.0.0" />

PipeStream

public class PipeStream : Stream
PipeStream is a thread-safe read/write data stream for use between two threads in a single-producer/single-consumer type problem.
using System; using System.Collections.Generic; using System.Globalization; using System.IO; using System.Threading; namespace Renci.SshNet.Common { public class PipeStream : Stream { private readonly Queue<byte> _buffer = new Queue<byte>(); private bool _isFlushed; private long _maxBufferLength = 209715200; private bool _canBlockLastRead; private bool _isDisposed; public long MaxBufferLength { get { return _maxBufferLength; } set { _maxBufferLength = value; } } public bool BlockLastReadBuffer { get { if (_isDisposed) throw CreateObjectDisposedException(); return _canBlockLastRead; } set { if (_isDisposed) throw CreateObjectDisposedException(); _canBlockLastRead = value; if (!_canBlockLastRead) { lock (_buffer) { Monitor.Pulse(_buffer); } } } } public override bool CanRead => !_isDisposed; public override bool CanSeek => false; public override bool CanWrite => !_isDisposed; public override long Length { get { if (_isDisposed) throw CreateObjectDisposedException(); return _buffer.Count; } } public override long Position { get { return 0; } set { throw new NotSupportedException(); } } public override void Flush() { if (_isDisposed) throw CreateObjectDisposedException(); _isFlushed = true; lock (_buffer) { Monitor.Pulse(_buffer); } } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } public override int Read(byte[] buffer, int offset, int count) { if (offset != 0) throw new NotSupportedException("Offsets with value of non-zero are not supported"); if (buffer == null) throw new ArgumentNullException("buffer"); if (offset + count > buffer.Length) throw new ArgumentException("The sum of offset and count is greater than the buffer length."); if (offset < 0 || count < 0) throw new ArgumentOutOfRangeException("offset", "offset or count is negative."); if (BlockLastReadBuffer && count >= _maxBufferLength) throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, "count({0}) > mMaxBufferLength({1})", count, _maxBufferLength)); if (_isDisposed) throw CreateObjectDisposedException(); if (count == 0) return 0; int i = 0; lock (_buffer) { while (!_isDisposed && !ReadAvailable(count)) { Monitor.Wait(_buffer); } if (!_isDisposed) { for (; i < count; i++) { if (_buffer.Count <= 0) break; buffer[i] = _buffer.Dequeue(); } Monitor.Pulse(_buffer); return i; } return 0; } } private bool ReadAvailable(int count) { long length = Length; if (_isFlushed || length >= count) { if (length < count + 1) return !BlockLastReadBuffer; return true; } return false; } public override void Write(byte[] buffer, int offset, int count) { if (buffer == null) throw new ArgumentNullException("buffer"); if (offset + count > buffer.Length) throw new ArgumentException("The sum of offset and count is greater than the buffer length."); if (offset < 0 || count < 0) throw new ArgumentOutOfRangeException("offset", "offset or count is negative."); if (_isDisposed) throw CreateObjectDisposedException(); if (count != 0) { lock (_buffer) { while (Length >= _maxBufferLength) { Monitor.Wait(_buffer); } _isFlushed = false; for (int i = offset; i < offset + count; i++) { _buffer.Enqueue(buffer[i]); } Monitor.Pulse(_buffer); } } } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (!_isDisposed) { lock (_buffer) { _isDisposed = true; Monitor.Pulse(_buffer); } } } private ObjectDisposedException CreateObjectDisposedException() { return new ObjectDisposedException(GetType().FullName); } } }