SftpFileStream
Exposes a System.IO.Stream around a remote SFTP file, supporting both synchronous and asynchronous read and write operations.
using Renci.SshNet.Common;
using System;
using System.IO;
using System.Threading;
namespace Renci.SshNet.Sftp
{
public class SftpFileStream : Stream
{
private byte[] _handle;
private readonly bool _ownsHandle;
private readonly bool _isAsync;
private ISftpSession _session;
private readonly int _readBufferSize;
private readonly byte[] _readBuffer;
private readonly int _writeBufferSize;
private readonly byte[] _writeBuffer;
private int _bufferPosition;
private int _bufferLen;
private long _position;
private bool _bufferOwnedByWrite;
private bool _canRead;
private bool _canSeek;
private bool _canWrite;
private ulong _serverFilePosition;
private SftpFileAttributes _attributes;
private readonly object _lock = new object();
public override bool CanRead => _canRead;
public override bool CanSeek => _canSeek;
public override bool CanWrite => _canWrite;
public override bool CanTimeout => true;
public override long Length {
get {
lock (_lock) {
CheckSessionIsOpen();
if (!CanSeek)
throw new NotSupportedException("Seek operation is not supported.");
if (_bufferOwnedByWrite)
FlushWriteBuffer();
_attributes = _session.RequestFStat(_handle);
if (_attributes == null || _attributes.Size <= -1)
throw new IOException("Seek operation failed.");
return _attributes.Size;
}
}
}
public override long Position {
get {
CheckSessionIsOpen();
if (!CanSeek)
throw new NotSupportedException("Seek operation not supported.");
return _position;
}
set {
Seek(value, SeekOrigin.Begin);
}
}
public virtual bool IsAsync => _isAsync;
public string Name { get; set; }
public virtual byte[] Handle {
get {
Flush();
return _handle;
}
}
public TimeSpan Timeout { get; set; }
internal SftpFileStream(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize)
: this(session, path, mode, access, bufferSize, false)
{
}
internal SftpFileStream(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize, bool useAsync)
{
if (session == null)
throw new SshConnectionException("Client not connected.");
if (path == null)
throw new ArgumentNullException("path");
if (bufferSize <= 0)
throw new ArgumentOutOfRangeException("bufferSize");
if (access < FileAccess.Read || access > FileAccess.ReadWrite)
throw new ArgumentOutOfRangeException("access");
if (mode < FileMode.CreateNew || mode > FileMode.Append)
throw new ArgumentOutOfRangeException("mode");
Timeout = TimeSpan.FromSeconds(30);
Name = path;
_session = session;
_ownsHandle = true;
_isAsync = useAsync;
_bufferPosition = 0;
_bufferLen = 0;
_bufferOwnedByWrite = false;
_canRead = ((access & FileAccess.Read) != (FileAccess)0);
_canSeek = true;
_canWrite = ((access & FileAccess.Write) != (FileAccess)0);
_position = 0;
_serverFilePosition = 0;
Flags flags = Flags.None;
switch (access) {
case FileAccess.Read:
flags |= Flags.Read;
break;
case FileAccess.Write:
flags |= Flags.Write;
break;
case FileAccess.ReadWrite:
flags |= Flags.Read;
flags |= Flags.Write;
break;
}
switch (mode) {
case FileMode.Append:
flags |= Flags.Append;
break;
case FileMode.Create:
_handle = _session.RequestOpen(path, flags | Flags.Truncate, true);
flags = ((_handle != null) ? (flags | Flags.Truncate) : (flags | Flags.CreateNew));
break;
case FileMode.CreateNew:
flags |= Flags.CreateNew;
break;
case FileMode.OpenOrCreate:
flags |= Flags.CreateNewOrOpen;
break;
case FileMode.Truncate:
flags |= Flags.Truncate;
break;
}
if (_handle == null)
_handle = _session.RequestOpen(path, flags, false);
_attributes = _session.RequestFStat(_handle);
_readBufferSize = (int)session.CalculateOptimalReadLength((uint)bufferSize);
_readBuffer = new byte[_readBufferSize];
_writeBufferSize = (int)session.CalculateOptimalWriteLength((uint)bufferSize, _handle);
_writeBuffer = new byte[_writeBufferSize];
if (mode == FileMode.Append) {
_position = _attributes.Size;
_serverFilePosition = (ulong)_attributes.Size;
}
}
~SftpFileStream()
{
Dispose(false);
}
public override void Flush()
{
lock (_lock) {
CheckSessionIsOpen();
if (_bufferOwnedByWrite)
FlushWriteBuffer();
else
FlushReadBuffer();
}
}
public override int Read(byte[] buffer, int offset, int count)
{
int num = 0;
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (count < 0)
throw new ArgumentOutOfRangeException("count");
if (buffer.Length - offset < count)
throw new ArgumentException("Invalid array range.");
lock (_lock) {
CheckSessionIsOpen();
SetupRead();
while (true) {
if (count <= 0)
return num;
int num2 = _bufferLen - _bufferPosition;
if (num2 <= 0) {
_bufferPosition = 0;
byte[] array = _session.RequestRead(_handle, (ulong)_position, (uint)_readBufferSize);
_bufferLen = array.Length;
Buffer.BlockCopy(array, 0, _readBuffer, 0, _bufferLen);
_serverFilePosition = (ulong)_position;
if (_bufferLen < 0) {
_bufferLen = 0;
throw new IOException("Read operation failed.");
}
if (_bufferLen == 0)
break;
num2 = _bufferLen;
}
if (num2 > count)
num2 = count;
Buffer.BlockCopy(_readBuffer, _bufferPosition, buffer, offset, num2);
num += num2;
offset += num2;
count -= num2;
_bufferPosition += num2;
_position += num2;
}
return num;
}
}
public override int ReadByte()
{
lock (_lock) {
CheckSessionIsOpen();
SetupRead();
if (_bufferPosition >= _bufferLen) {
_bufferPosition = 0;
byte[] array = _session.RequestRead(_handle, (ulong)_position, (uint)_readBufferSize);
_bufferLen = array.Length;
Buffer.BlockCopy(array, 0, _readBuffer, 0, _readBufferSize);
_serverFilePosition = (ulong)_position;
if (_bufferLen < 0) {
_bufferLen = 0;
throw new IOException("Read operation failed.");
}
if (_bufferLen == 0)
return -1;
}
_position++;
return _readBuffer[_bufferPosition++];
}
}
public override long Seek(long offset, SeekOrigin origin)
{
long num = -1;
lock (_lock) {
CheckSessionIsOpen();
if (!CanSeek)
throw new NotSupportedException("Seek is not supported.");
if (origin == SeekOrigin.Begin && offset == _position)
return offset;
if (origin == SeekOrigin.Current && offset == 0)
return _position;
_attributes = _session.RequestFStat(_handle);
if (_bufferOwnedByWrite) {
FlushWriteBuffer();
switch (origin) {
case SeekOrigin.Begin:
num = offset;
break;
case SeekOrigin.Current:
num = _position + offset;
break;
case SeekOrigin.End:
num = _attributes.Size - offset;
break;
}
if (num == -1)
throw new EndOfStreamException("End of stream.");
_position = num;
_serverFilePosition = (ulong)num;
} else {
switch (origin) {
case SeekOrigin.Begin:
num = _position - _bufferPosition;
if (offset >= num && offset < num + _bufferLen) {
_bufferPosition = (int)(offset - num);
_position = offset;
return _position;
}
break;
case SeekOrigin.Current:
num = _position + offset;
if (num >= _position - _bufferPosition && num < _position - _bufferPosition + _bufferLen) {
_bufferPosition = (int)(num - (_position - _bufferPosition));
_position = num;
return _position;
}
break;
}
_bufferPosition = 0;
_bufferLen = 0;
switch (origin) {
case SeekOrigin.Begin:
num = offset;
break;
case SeekOrigin.Current:
num = _position + offset;
break;
case SeekOrigin.End:
num = _attributes.Size - offset;
break;
}
if (num < 0)
throw new EndOfStreamException();
_position = num;
}
return _position;
}
}
public override void SetLength(long value)
{
if (value < 0)
throw new ArgumentOutOfRangeException("value");
lock (_lock) {
CheckSessionIsOpen();
if (!CanSeek)
throw new NotSupportedException("Seek is not supported.");
SetupWrite();
_attributes.Size = value;
_session.RequestFSetStat(_handle, _attributes);
}
}
public override void Write(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (count < 0)
throw new ArgumentOutOfRangeException("count");
if (buffer.Length - offset < count)
throw new ArgumentException("Invalid array range.");
lock (_lock) {
CheckSessionIsOpen();
SetupWrite();
while (count > 0) {
int num = _writeBufferSize - _bufferPosition;
if (num <= 0) {
using (AutoResetEvent wait = new AutoResetEvent(false)) {
_session.RequestWrite(_handle, _serverFilePosition, _writeBuffer, _bufferPosition, wait, null);
_serverFilePosition += (ulong)_bufferPosition;
}
_bufferPosition = 0;
num = _writeBufferSize;
}
if (num > count)
num = count;
if (_bufferPosition == 0 && num == _writeBufferSize) {
using (AutoResetEvent wait2 = new AutoResetEvent(false)) {
_session.RequestWrite(_handle, _serverFilePosition, buffer, num, wait2, null);
_serverFilePosition += (ulong)num;
}
} else {
Buffer.BlockCopy(buffer, offset, _writeBuffer, _bufferPosition, num);
_bufferPosition += num;
}
_position += num;
offset += num;
count -= num;
}
if (_bufferPosition >= _writeBufferSize) {
using (AutoResetEvent wait3 = new AutoResetEvent(false)) {
_session.RequestWrite(_handle, _serverFilePosition, _writeBuffer, _bufferPosition, wait3, null);
_serverFilePosition += (ulong)_bufferPosition;
}
_bufferPosition = 0;
}
}
}
public override void WriteByte(byte value)
{
lock (_lock) {
CheckSessionIsOpen();
SetupWrite();
if (_bufferPosition >= _writeBufferSize) {
using (AutoResetEvent wait = new AutoResetEvent(false)) {
_session.RequestWrite(_handle, _serverFilePosition, _writeBuffer, _bufferPosition, wait, null);
_serverFilePosition += (ulong)_bufferPosition;
}
_bufferPosition = 0;
}
_writeBuffer[_bufferPosition++] = value;
_position++;
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (_session != null && disposing) {
lock (_lock) {
if (_session != null) {
_canRead = false;
_canSeek = false;
_canWrite = false;
if (_handle != null) {
if (_session.IsOpen) {
if (_bufferOwnedByWrite)
FlushWriteBuffer();
if (_ownsHandle)
_session.RequestClose(_handle);
}
_handle = null;
}
_session = null;
}
}
}
}
private void FlushReadBuffer()
{
if (_canSeek) {
if (_bufferPosition < _bufferLen)
_position -= _bufferPosition;
_bufferPosition = 0;
_bufferLen = 0;
}
}
private void FlushWriteBuffer()
{
if (_bufferPosition > 0) {
using (AutoResetEvent wait = new AutoResetEvent(false)) {
_session.RequestWrite(_handle, _serverFilePosition, _writeBuffer, _bufferPosition, wait, null);
_serverFilePosition += (ulong)_bufferPosition;
}
_bufferPosition = 0;
}
}
private void SetupRead()
{
if (!CanRead)
throw new NotSupportedException("Read not supported.");
if (_bufferOwnedByWrite) {
FlushWriteBuffer();
_bufferOwnedByWrite = false;
}
}
private void SetupWrite()
{
if (!CanWrite)
throw new NotSupportedException("Write not supported.");
if (!_bufferOwnedByWrite) {
FlushReadBuffer();
_bufferOwnedByWrite = true;
}
}
private void CheckSessionIsOpen()
{
if (_session == null)
throw new ObjectDisposedException(GetType().FullName);
if (!_session.IsOpen)
throw new ObjectDisposedException(GetType().FullName, "Cannot access a closed SFTP session.");
}
}
}