SftpFileStream
Exposes a Stream around a remote SFTP file, supporting both synchronous and asynchronous read and write operations.
using Renci.SshNet.Common;
using System;
using System.Globalization;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Renci.SshNet.Sftp
{
public class SftpFileStream : Stream
{
private readonly object _lock = new object();
private readonly int _readBufferSize;
private readonly int _writeBufferSize;
private byte[] _handle;
private ISftpSession _session;
private byte[] _readBuffer;
private byte[] _writeBuffer;
private int _bufferPosition;
private int _bufferLen;
private long _position;
private bool _bufferOwnedByWrite;
private bool _canRead;
private bool _canSeek;
private bool _canWrite;
public override bool CanRead => _canRead;
public override bool CanSeek => _canSeek;
public override bool CanWrite => _canWrite;
public override bool CanTimeout => true;
public override long Length {
get {
lock (_lock) {
CheckSessionIsOpen();
if (!CanSeek)
throw new NotSupportedException("Seek operation is not supported.");
if (_bufferOwnedByWrite)
FlushWriteBuffer();
SftpFileAttributes sftpFileAttributes = _session.RequestFStat(_handle, true);
if (sftpFileAttributes == null)
throw new IOException("Seek operation failed.");
return sftpFileAttributes.Size;
}
}
}
public override long Position {
get {
CheckSessionIsOpen();
if (!CanSeek)
throw new NotSupportedException("Seek operation not supported.");
return _position;
}
set {
Seek(value, SeekOrigin.Begin);
}
}
public string Name { get; set; }
public virtual byte[] Handle {
get {
Flush();
return _handle;
}
}
public TimeSpan Timeout { get; set; }
private SftpFileStream(ISftpSession session, string path, FileAccess access, int bufferSize, byte[] handle, long position)
{
Timeout = TimeSpan.FromSeconds(30);
Name = path;
_session = session;
_canRead = ((access & FileAccess.Read) == FileAccess.Read);
_canSeek = true;
_canWrite = ((access & FileAccess.Write) == FileAccess.Write);
_handle = handle;
_readBufferSize = (int)session.CalculateOptimalReadLength((uint)bufferSize);
_writeBufferSize = (int)session.CalculateOptimalWriteLength((uint)bufferSize, _handle);
_position = position;
}
internal SftpFileStream(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize)
{
if (session == null)
throw new SshConnectionException("Client not connected.");
if (path == null)
throw new ArgumentNullException("path");
if (bufferSize <= 0)
throw new ArgumentOutOfRangeException("bufferSize", "Cannot be less than or equal to zero.");
Timeout = TimeSpan.FromSeconds(30);
Name = path;
_session = session;
_canRead = ((access & FileAccess.Read) == FileAccess.Read);
_canSeek = true;
_canWrite = ((access & FileAccess.Write) == FileAccess.Write);
Flags flags = Flags.None;
switch (access) {
case FileAccess.Read:
flags |= Flags.Read;
break;
case FileAccess.Write:
flags |= Flags.Write;
break;
case FileAccess.ReadWrite:
flags |= Flags.Read;
flags |= Flags.Write;
break;
default:
throw new ArgumentOutOfRangeException("access");
}
if ((access & FileAccess.Read) == FileAccess.Read && mode == FileMode.Append)
throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "{0} mode can be requested only when combined with write-only access.", mode.ToString("G")), "mode");
if ((access & FileAccess.Write) != FileAccess.Write && (((uint)(mode - 1) <= 1 || (uint)(mode - 5) <= 1) ? true : false))
throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Combining {0}: {1} with {2}: {3} is invalid.", "FileMode", mode, "FileAccess", access), "mode");
switch (mode) {
case FileMode.Append:
flags |= (Flags.Append | Flags.CreateNewOrOpen);
break;
case FileMode.Create:
_handle = _session.RequestOpen(path, flags | Flags.Truncate, true);
flags = ((_handle != null) ? (flags | Flags.Truncate) : (flags | Flags.CreateNew));
break;
case FileMode.CreateNew:
flags |= Flags.CreateNew;
break;
case FileMode.OpenOrCreate:
flags |= Flags.CreateNewOrOpen;
break;
case FileMode.Truncate:
flags |= Flags.Truncate;
break;
default:
throw new ArgumentOutOfRangeException("mode");
case FileMode.Open:
break;
}
if (_handle == null)
_handle = _session.RequestOpen(path, flags, false);
_readBufferSize = (int)session.CalculateOptimalReadLength((uint)bufferSize);
_writeBufferSize = (int)session.CalculateOptimalWriteLength((uint)bufferSize, _handle);
if (mode == FileMode.Append) {
SftpFileAttributes sftpFileAttributes = _session.RequestFStat(_handle, false);
_position = sftpFileAttributes.Size;
}
}
[AsyncStateMachine(typeof(<OpenAsync>d__39))]
internal static Task<SftpFileStream> OpenAsync(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize, CancellationToken cancellationToken)
{
<OpenAsync>d__39 stateMachine = default(<OpenAsync>d__39);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<SftpFileStream>.Create();
stateMachine.session = session;
stateMachine.path = path;
stateMachine.mode = mode;
stateMachine.access = access;
stateMachine.bufferSize = bufferSize;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
~SftpFileStream()
{
Dispose(false);
}
public override void Flush()
{
lock (_lock) {
CheckSessionIsOpen();
if (_bufferOwnedByWrite)
FlushWriteBuffer();
else
FlushReadBuffer();
}
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
CheckSessionIsOpen();
if (_bufferOwnedByWrite)
return FlushWriteBufferAsync(cancellationToken);
FlushReadBuffer();
return Task.CompletedTask;
}
public override int Read(byte[] buffer, int offset, int count)
{
int num = 0;
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (count < 0)
throw new ArgumentOutOfRangeException("count");
if (buffer.Length - offset < count)
throw new ArgumentException("Invalid array range.");
lock (_lock) {
CheckSessionIsOpen();
SetupRead();
while (true) {
if (count <= 0)
return num;
int num2 = _bufferLen - _bufferPosition;
if (num2 <= 0) {
byte[] array = _session.RequestRead(_handle, (ulong)_position, (uint)_readBufferSize);
if (array.Length == 0) {
_bufferPosition = 0;
_bufferLen = 0;
return num;
}
int num3 = count;
if (num3 >= array.Length) {
num3 = array.Length;
_bufferPosition = 0;
_bufferLen = 0;
} else {
int num4 = array.Length - num3;
Buffer.BlockCopy(array, count, GetOrCreateReadBuffer(), 0, num4);
_bufferPosition = 0;
_bufferLen = num4;
}
Buffer.BlockCopy(array, 0, buffer, offset, num3);
_position += num3;
num += num3;
if (array.Length < _readBufferSize)
break;
offset += num3;
count -= num3;
} else {
if (num2 > count)
num2 = count;
Buffer.BlockCopy(GetOrCreateReadBuffer(), _bufferPosition, buffer, offset, num2);
_bufferPosition += num2;
_position += num2;
num += num2;
offset += num2;
count -= num2;
}
}
return num;
}
}
[AsyncStateMachine(typeof(<ReadAsync>d__44))]
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
<ReadAsync>d__44 stateMachine = default(<ReadAsync>d__44);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<int>.Create();
stateMachine.<>4__this = this;
stateMachine.buffer = buffer;
stateMachine.offset = offset;
stateMachine.count = count;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public override int ReadByte()
{
lock (_lock) {
CheckSessionIsOpen();
SetupRead();
byte[] orCreateReadBuffer;
if (_bufferPosition >= _bufferLen) {
byte[] array = _session.RequestRead(_handle, (ulong)_position, (uint)_readBufferSize);
if (array.Length == 0)
return -1;
orCreateReadBuffer = GetOrCreateReadBuffer();
Buffer.BlockCopy(array, 0, orCreateReadBuffer, 0, array.Length);
_bufferPosition = 0;
_bufferLen = array.Length;
} else
orCreateReadBuffer = GetOrCreateReadBuffer();
_position++;
return orCreateReadBuffer[_bufferPosition++];
}
}
public override long Seek(long offset, SeekOrigin origin)
{
lock (_lock) {
CheckSessionIsOpen();
if (!CanSeek)
throw new NotSupportedException("Seek is not supported.");
if (origin == SeekOrigin.Begin && offset == _position)
return offset;
if (origin == SeekOrigin.Current && offset == 0)
return _position;
long num;
if (_bufferOwnedByWrite)
FlushWriteBuffer();
else {
switch (origin) {
case SeekOrigin.Begin:
num = _position - _bufferPosition;
if (offset >= num && offset < num + _bufferLen) {
_bufferPosition = (int)(offset - num);
_position = offset;
return _position;
}
break;
case SeekOrigin.Current:
num = _position + offset;
if (num >= _position - _bufferPosition && num < _position - _bufferPosition + _bufferLen) {
_bufferPosition = (int)(num - (_position - _bufferPosition));
_position = num;
return _position;
}
break;
}
_bufferPosition = 0;
_bufferLen = 0;
}
switch (origin) {
case SeekOrigin.Begin:
num = offset;
break;
case SeekOrigin.Current:
num = _position + offset;
break;
case SeekOrigin.End:
num = _session.RequestFStat(_handle, false).Size + offset;
break;
default:
throw new ArgumentException("Invalid seek origin.", "origin");
}
if (num < 0)
throw new EndOfStreamException();
_position = num;
return _position;
}
}
public override void SetLength(long value)
{
if (value < 0)
throw new ArgumentOutOfRangeException("value");
lock (_lock) {
CheckSessionIsOpen();
if (!CanSeek)
throw new NotSupportedException("Seek is not supported.");
if (_bufferOwnedByWrite)
FlushWriteBuffer();
else
SetupWrite();
SftpFileAttributes sftpFileAttributes = _session.RequestFStat(_handle, false);
sftpFileAttributes.Size = value;
_session.RequestFSetStat(_handle, sftpFileAttributes);
if (_position > value)
_position = value;
}
}
public override void Write(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (count < 0)
throw new ArgumentOutOfRangeException("count");
if (buffer.Length - offset < count)
throw new ArgumentException("Invalid array range.");
lock (_lock) {
CheckSessionIsOpen();
SetupWrite();
while (count > 0) {
int num = _writeBufferSize - _bufferPosition;
if (num <= 0) {
FlushWriteBuffer();
num = _writeBufferSize;
}
if (num > count)
num = count;
if (_bufferPosition == 0 && num == _writeBufferSize) {
using (AutoResetEvent wait = new AutoResetEvent(false))
_session.RequestWrite(_handle, (ulong)_position, buffer, offset, num, wait, null);
} else {
Buffer.BlockCopy(buffer, offset, GetOrCreateWriteBuffer(), _bufferPosition, num);
_bufferPosition += num;
}
_position += num;
offset += num;
count -= num;
}
if (_bufferPosition >= _writeBufferSize) {
using (AutoResetEvent wait2 = new AutoResetEvent(false))
_session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), GetOrCreateWriteBuffer(), 0, _bufferPosition, wait2, null);
_bufferPosition = 0;
}
}
}
[AsyncStateMachine(typeof(<WriteAsync>d__49))]
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
<WriteAsync>d__49 stateMachine = default(<WriteAsync>d__49);
stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.buffer = buffer;
stateMachine.offset = offset;
stateMachine.count = count;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public override void WriteByte(byte value)
{
lock (_lock) {
CheckSessionIsOpen();
SetupWrite();
byte[] orCreateWriteBuffer = GetOrCreateWriteBuffer();
if (_bufferPosition >= _writeBufferSize) {
using (AutoResetEvent wait = new AutoResetEvent(false))
_session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), orCreateWriteBuffer, 0, _bufferPosition, wait, null);
_bufferPosition = 0;
}
orCreateWriteBuffer[_bufferPosition++] = value;
_position++;
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (_session != null && disposing) {
lock (_lock) {
if (_session != null) {
_canRead = false;
_canSeek = false;
_canWrite = false;
if (_handle != null) {
if (_session.IsOpen) {
if (_bufferOwnedByWrite)
FlushWriteBuffer();
_session.RequestClose(_handle);
}
_handle = null;
}
_session = null;
}
}
}
}
private byte[] GetOrCreateReadBuffer()
{
if (_readBuffer == null)
_readBuffer = new byte[_readBufferSize];
return _readBuffer;
}
private byte[] GetOrCreateWriteBuffer()
{
if (_writeBuffer == null)
_writeBuffer = new byte[_writeBufferSize];
return _writeBuffer;
}
private void FlushReadBuffer()
{
_bufferPosition = 0;
_bufferLen = 0;
}
private void FlushWriteBuffer()
{
if (_bufferPosition > 0) {
using (AutoResetEvent wait = new AutoResetEvent(false))
_session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), _writeBuffer, 0, _bufferPosition, wait, null);
_bufferPosition = 0;
}
}
[AsyncStateMachine(typeof(<FlushWriteBufferAsync>d__56))]
private Task FlushWriteBufferAsync(CancellationToken cancellationToken)
{
<FlushWriteBufferAsync>d__56 stateMachine = default(<FlushWriteBufferAsync>d__56);
stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private void SetupRead()
{
if (!CanRead)
throw new NotSupportedException("Read not supported.");
if (_bufferOwnedByWrite) {
FlushWriteBuffer();
_bufferOwnedByWrite = false;
}
}
private void SetupWrite()
{
if (!CanWrite)
throw new NotSupportedException("Write not supported.");
if (!_bufferOwnedByWrite) {
FlushReadBuffer();
_bufferOwnedByWrite = true;
}
}
private void CheckSessionIsOpen()
{
if (_session == null)
throw new ObjectDisposedException(GetType().FullName);
if (!_session.IsOpen)
throw new ObjectDisposedException(GetType().FullName, "Cannot access a closed SFTP session.");
}
}
}