SftpFileStream
Exposes a Stream around a remote SFTP file, supporting
both synchronous and asynchronous read and write operations.
using Renci.SshNet.Common;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
using System.Net;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
namespace Renci.SshNet.Sftp
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
public sealed class SftpFileStream : Stream
{
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class SftpFileReader : IDisposable
{
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class Request
{
public ulong Offset { get; }
public uint Count { get; }
public Task<byte[]> Task { get; }
public Request(ulong offset, uint count, Task<byte[]> task)
{
Offset = offset;
Count = count;
Task = task;
}
}
private readonly byte[] _handle;
private readonly ISftpSession _sftpSession;
private readonly int _maxPendingReads;
private readonly ulong? _fileSize;
private readonly Dictionary<ulong, Request> _requests = new Dictionary<ulong, Request>();
private readonly CancellationTokenSource _cts;
private uint _chunkSize;
private ulong _offset;
private ulong _readAheadOffset;
private int _currentMaxRequests;
[System.Runtime.CompilerServices.Nullable(2)]
private ExceptionDispatchInfo _exception;
public SftpFileReader(byte[] handle, ISftpSession sftpSession, int chunkSize, long position, int maxPendingReads, ulong? fileSize = default(ulong?), int initialMaxRequests = 1)
{
_handle = handle;
_sftpSession = sftpSession;
_chunkSize = (uint)chunkSize;
_offset = (_readAheadOffset = (ulong)position);
_maxPendingReads = maxPendingReads;
_fileSize = fileSize;
_currentMaxRequests = initialMaxRequests;
_cts = new CancellationTokenSource();
}
[AsyncStateMachine(typeof(<ReadAsync>d__12))]
public Task<byte[]> ReadAsync(CancellationToken cancellationToken)
{
<ReadAsync>d__12 stateMachine = default(<ReadAsync>d__12);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<byte[]>.Create();
stateMachine.<>4__this = this;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private void AddRequest(ulong offset, uint count)
{
_requests.Add(offset, new Request(offset, count, _sftpSession.RequestReadAsync(_handle, offset, count, _cts.Token)));
}
public void Dispose()
{
if (_exception == null)
_exception = ExceptionDispatchInfo.Capture(new ObjectDisposedException(GetType().FullName));
if (_requests.Count > 0) {
_cts.Cancel();
foreach (Request value in _requests.Values) {
AggregateException exception = value.Task.Exception;
}
_requests.Clear();
}
_cts.Dispose();
}
}
private const int MaxPendingReads = 100;
private readonly ISftpSession _session;
private readonly FileAccess _access;
private readonly bool _canSeek;
private readonly int _readBufferSize;
[System.Runtime.CompilerServices.Nullable(2)]
private SftpFileReader _sftpFileReader;
[System.Runtime.CompilerServices.Nullable(0)]
private ReadOnlyMemory<byte> _readBuffer;
private System.Net.ArrayBuffer _writeBuffer;
private long _position;
private TimeSpan _timeout;
private bool _disposed;
public override bool CanRead {
get {
if (!_disposed)
return (_access & FileAccess.Read) == FileAccess.Read;
return false;
}
}
public override bool CanSeek {
get {
if (!_disposed)
return _canSeek;
return false;
}
}
public override bool CanWrite {
get {
if (!_disposed)
return (_access & FileAccess.Write) == FileAccess.Write;
return false;
}
}
public override bool CanTimeout => false;
public override long Length {
get {
ThrowIfNotSeekable();
Flush();
return _session.RequestFStat(Handle).Size;
}
}
public override long Position {
get {
ThrowIfNotSeekable();
return _position;
}
set {
Seek(value, SeekOrigin.Begin);
}
}
public string Name { get; }
public byte[] Handle { get; }
[EditorBrowsable(EditorBrowsableState.Never)]
public TimeSpan Timeout {
get {
return _timeout;
}
set {
value.EnsureValidTimeout("Timeout");
_timeout = value;
}
}
private SftpFileStream(ISftpSession session, string path, FileAccess access, bool canSeek, int readBufferSize, int writeBufferSize, byte[] handle, long position, [System.Runtime.CompilerServices.Nullable(2)] SftpFileReader initialReader)
{
Timeout = TimeSpan.FromSeconds(30);
Name = path;
_session = session;
_access = access;
_canSeek = canSeek;
Handle = handle;
_readBufferSize = readBufferSize;
_position = position;
_writeBuffer = new System.Net.ArrayBuffer(writeBufferSize, false);
_sftpFileReader = initialReader;
}
internal static SftpFileStream Open([System.Runtime.CompilerServices.Nullable(2)] ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize, bool isDownloadFile = false)
{
return Open(session, path, mode, access, bufferSize, isDownloadFile, false, CancellationToken.None).GetAwaiter().GetResult();
}
internal static Task<SftpFileStream> OpenAsync([System.Runtime.CompilerServices.Nullable(2)] ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize, CancellationToken cancellationToken, bool isDownloadFile = false)
{
return Open(session, path, mode, access, bufferSize, isDownloadFile, true, cancellationToken);
}
[AsyncStateMachine(typeof(<Open>d__37))]
private static Task<SftpFileStream> Open([System.Runtime.CompilerServices.Nullable(2)] ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize, bool isDownloadFile, bool isAsync, CancellationToken cancellationToken)
{
<Open>d__37 stateMachine = default(<Open>d__37);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<SftpFileStream>.Create();
stateMachine.session = session;
stateMachine.path = path;
stateMachine.mode = mode;
stateMachine.access = access;
stateMachine.bufferSize = bufferSize;
stateMachine.isDownloadFile = isDownloadFile;
stateMachine.isAsync = isAsync;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public override void Flush()
{
ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
int activeLength = _writeBuffer.ActiveLength;
if (activeLength != 0) {
ulong serverOffset = checked((ulong)(_position - activeLength));
using (AutoResetEvent wait = new AutoResetEvent(false)) {
_session.RequestWrite(Handle, serverOffset, _writeBuffer.DangerousGetUnderlyingBuffer(), _writeBuffer.ActiveStartOffset, activeLength, wait, null);
_writeBuffer.Discard(activeLength);
}
}
}
[AsyncStateMachine(typeof(<FlushAsync>d__39))]
public override Task FlushAsync(CancellationToken cancellationToken)
{
<FlushAsync>d__39 stateMachine = default(<FlushAsync>d__39);
stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private void InvalidateReads()
{
_readBuffer = ReadOnlyMemory<byte>.Empty;
_sftpFileReader?.Dispose();
_sftpFileReader = null;
}
public override int Read(byte[] buffer, int offset, int count)
{
ThrowHelper.ValidateBufferArguments(buffer, offset, count);
return Read(buffer.AsSpan(offset, count));
}
[System.Runtime.CompilerServices.NullableContext(0)]
private new int Read(Span<byte> buffer)
{
ThrowIfNotReadable();
if (_readBuffer.IsEmpty) {
if (_sftpFileReader == null) {
Flush();
_sftpFileReader = new SftpFileReader(Handle, _session, _readBufferSize, _position, 100, null, 1);
}
_readBuffer = _sftpFileReader.ReadAsync(CancellationToken.None).GetAwaiter().GetResult();
if (_readBuffer.IsEmpty) {
_sftpFileReader.Dispose();
_sftpFileReader = null;
}
}
int num = Math.Min(buffer.Length, _readBuffer.Length);
ReadOnlySpan<byte> readOnlySpan = _readBuffer.Span;
readOnlySpan = readOnlySpan.Slice(0, num);
readOnlySpan.CopyTo(buffer);
_readBuffer = _readBuffer.Slice(num);
_position += num;
return num;
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ThrowHelper.ValidateBufferArguments(buffer, offset, count);
return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
}
[System.Runtime.CompilerServices.NullableContext(0)]
[AsyncStateMachine(typeof(<ReadAsync>d__44))]
private new ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
{
<ReadAsync>d__44 stateMachine = default(<ReadAsync>d__44);
stateMachine.<>t__builder = AsyncValueTaskMethodBuilder<int>.Create();
stateMachine.<>4__this = this;
stateMachine.buffer = buffer;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, [System.Runtime.CompilerServices.Nullable(2)] AsyncCallback callback, [System.Runtime.CompilerServices.Nullable(2)] object state)
{
return System.Threading.Tasks.TaskToAsyncResult.Begin(ReadAsync(buffer, offset, count), callback, state);
}
public override int EndRead(IAsyncResult asyncResult)
{
return System.Threading.Tasks.TaskToAsyncResult.End<int>(asyncResult);
}
public override void Write(byte[] buffer, int offset, int count)
{
ThrowHelper.ValidateBufferArguments(buffer, offset, count);
Write(buffer.AsSpan(offset, count));
}
[System.Runtime.CompilerServices.NullableContext(0)]
private new void Write(ReadOnlySpan<byte> buffer)
{
ThrowIfNotWriteable();
InvalidateReads();
while (!buffer.IsEmpty) {
int num = Math.Min(buffer.Length, _writeBuffer.AvailableLength);
buffer.Slice(0, num).CopyTo(_writeBuffer.AvailableSpan);
buffer = buffer.Slice(num);
_writeBuffer.Commit(num);
_position += num;
if (_writeBuffer.AvailableLength == 0)
Flush();
}
}
public override void WriteByte(byte value)
{
Write(new ReadOnlySpan<byte>(new byte[1] {
value
}));
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ThrowHelper.ValidateBufferArguments(buffer, offset, count);
return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
}
[System.Runtime.CompilerServices.NullableContext(0)]
[AsyncStateMachine(typeof(<WriteAsync>d__51))]
private new ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
<WriteAsync>d__51 stateMachine = default(<WriteAsync>d__51);
stateMachine.<>t__builder = AsyncValueTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.buffer = buffer;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, [System.Runtime.CompilerServices.Nullable(2)] AsyncCallback callback, [System.Runtime.CompilerServices.Nullable(2)] object state)
{
return System.Threading.Tasks.TaskToAsyncResult.Begin(WriteAsync(buffer, offset, count), callback, state);
}
public override void EndWrite(IAsyncResult asyncResult)
{
System.Threading.Tasks.TaskToAsyncResult.End(asyncResult);
}
public override long Seek(long offset, SeekOrigin origin)
{
ThrowIfNotSeekable();
Flush();
long num;
switch (origin) {
case SeekOrigin.Begin:
num = offset;
break;
case SeekOrigin.Current:
num = _position + offset;
break;
case SeekOrigin.End:
num = _session.RequestFStat(Handle).Size + offset;
break;
default:
throw new ArgumentOutOfRangeException("origin");
}
long num2 = num;
if (num2 < 0)
throw new IOException("An attempt was made to move the position before the beginning of the stream.");
long position = _position;
long num3 = _position + _readBuffer.Length;
if (position <= num2 && num2 <= num3)
_readBuffer = _readBuffer.Slice((int)(num2 - position));
else
InvalidateReads();
return _position = num2;
}
public override void SetLength(long value)
{
ThrowHelper.ThrowIfNegative(value, "value");
ThrowIfNotWriteable();
ThrowIfNotSeekable();
Flush();
InvalidateReads();
SftpFileAttributes sftpFileAttributes = _session.RequestFStat(Handle);
sftpFileAttributes.Size = value;
_session.RequestFSetStat(Handle, sftpFileAttributes);
if (_position > value)
_position = value;
}
protected override void Dispose(bool disposing)
{
if (!_disposed)
try {
if (disposing && _session.IsOpen)
try {
Flush();
} finally {
if (_session.IsOpen)
_session.RequestClose(Handle);
}
} finally {
_disposed = true;
InvalidateReads();
base.Dispose(disposing);
}
}
[AsyncStateMachine(typeof(<DisposeAsync>d__57))]
internal new ValueTask DisposeAsync()
{
<DisposeAsync>d__57 stateMachine = default(<DisposeAsync>d__57);
stateMachine.<>t__builder = AsyncValueTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private void ThrowIfNotSeekable()
{
if (!CanSeek) {
ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
<ThrowIfNotSeekable>g__Throw|58_0();
}
}
private void ThrowIfNotWriteable()
{
if (!CanWrite) {
ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
<ThrowIfNotWriteable>g__Throw|59_0();
}
}
private void ThrowIfNotReadable()
{
if (!CanRead) {
ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
<ThrowIfNotReadable>g__Throw|60_0();
}
}
}
}