PipeStream
PipeStream is a thread-safe read/write data stream for use between two threads in a
single-producer/single-consumer type problem.
using System;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
namespace Renci.SshNet.Common
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
public class PipeStream : Stream
{
private readonly object _sync = new object();
private byte[] _buffer = new byte[1024];
private int _head;
private int _tail;
private bool _disposed;
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => !_disposed;
public override long Length {
get {
lock (_sync) {
return _tail - _head;
}
}
}
public override long Position {
get {
return 0;
}
set {
throw new NotSupportedException();
}
}
[Conditional("DEBUG")]
private void AssertValid()
{
}
public override void Flush()
{
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
lock (_sync) {
while (_head == _tail && !_disposed) {
Monitor.Wait(_sync);
}
int num = Math.Min(count, _tail - _head);
Buffer.BlockCopy(_buffer, _head, buffer, offset, num);
_head += num;
return num;
}
}
public override void Write(byte[] buffer, int offset, int count)
{
lock (_sync) {
ThrowIfDisposed();
if (_buffer.Length - _tail >= count) {
Buffer.BlockCopy(buffer, offset, _buffer, _tail, count);
_tail += count;
} else {
int num = _tail - _head + count;
if (num <= _buffer.Length)
Buffer.BlockCopy(_buffer, _head, _buffer, 0, _tail - _head);
else {
byte[] array = new byte[Math.Max(num, _buffer.Length * 2)];
Buffer.BlockCopy(_buffer, _head, array, 0, _tail - _head);
_buffer = array;
}
Buffer.BlockCopy(buffer, offset, _buffer, _tail - _head, count);
_head = 0;
_tail = num;
}
Monitor.PulseAll(_sync);
}
}
protected override void Dispose(bool disposing)
{
if (!disposing)
base.Dispose(disposing);
else {
lock (_sync) {
if (_disposed)
return;
_disposed = true;
Monitor.PulseAll(_sync);
}
base.Dispose(disposing);
}
}
private void ThrowIfDisposed()
{
if (_disposed)
throw new ObjectDisposedException(GetType().FullName);
}
}
}