SftpFileReader
using Renci.SshNet.Abstractions;
using Renci.SshNet.Common;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Threading;
namespace Renci.SshNet.Sftp
{
internal class SftpFileReader : ISftpFileReader, IDisposable
{
internal class BufferedRead
{
public int ChunkIndex { get; set; }
public byte[] Data { get; set; }
public ulong Offset { get; set; }
public BufferedRead(int chunkIndex, ulong offset)
{
ChunkIndex = chunkIndex;
Offset = offset;
}
public void Complete(byte[] data)
{
Data = data;
}
}
private const int ReadAheadWaitTimeoutInMilliseconds = 1000;
private readonly byte[] _handle;
private readonly ISftpSession _sftpSession;
private readonly uint _chunkSize;
private ulong _offset;
private readonly long? _fileSize;
private readonly Dictionary<int, BufferedRead> _queue;
private readonly WaitHandle[] _waitHandles;
private int _readAheadChunkIndex;
private ulong _readAheadOffset;
private readonly ManualResetEvent _readAheadCompleted;
private ManualResetEvent _disposingWaitHandle;
private int _nextChunkIndex;
private bool _endOfFileReceived;
private bool _isEndOfFileRead;
private readonly SemaphoreLight _semaphore;
private readonly object _readLock;
private Exception _exception;
public SftpFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize)
{
_handle = handle;
_sftpSession = sftpSession;
_chunkSize = chunkSize;
_fileSize = fileSize;
_semaphore = new SemaphoreLight(maxPendingReads);
_queue = new Dictionary<int, BufferedRead>(maxPendingReads);
_readLock = new object();
_readAheadCompleted = new ManualResetEvent(false);
_disposingWaitHandle = new ManualResetEvent(false);
_waitHandles = _sftpSession.CreateWaitHandleArray(_disposingWaitHandle, _semaphore.AvailableWaitHandle);
StartReadAhead();
}
public byte[] Read()
{
if (_disposingWaitHandle == null)
throw new ObjectDisposedException(GetType().FullName);
if (_exception != null)
throw _exception;
if (_isEndOfFileRead)
throw new SshException("Attempting to read beyond the end of the file.");
BufferedRead value;
lock (_readLock) {
while (!_queue.TryGetValue(_nextChunkIndex, out value) && _exception == null) {
Monitor.Wait(_readLock);
}
if (_exception != null)
throw _exception;
byte[] data = value.Data;
if (value.Offset == _offset) {
if (data.Length == 0)
_isEndOfFileRead = true;
else {
_queue.Remove(_nextChunkIndex);
_offset += (ulong)data.Length;
_nextChunkIndex++;
}
_semaphore.Release();
return data;
}
if (data.Length == 0 && _fileSize.HasValue && _offset == (ulong)_fileSize.Value) {
_isEndOfFileRead = true;
_semaphore.Release();
return value.Data;
}
}
ulong num = value.Offset - _offset;
byte[] array = _sftpSession.RequestRead(_handle, _offset, (uint)num);
if (array.Length == 0) {
lock (_readLock) {
if (value.Data.Length != 0) {
_exception = new SshException("Unexpectedly reached end of file.");
if (_semaphore != null)
_semaphore.Release();
throw _exception;
}
_isEndOfFileRead = true;
if (_semaphore != null)
_semaphore.Release();
return array;
}
}
_offset += (uint)array.Length;
return array;
}
~SftpFileReader()
{
Dispose(false);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected void Dispose(bool disposing)
{
if (_disposingWaitHandle != null && disposing) {
_exception = new ObjectDisposedException(GetType().FullName);
_disposingWaitHandle.Set();
_readAheadCompleted.WaitOne();
lock (_readLock) {
_semaphore.Dispose();
Monitor.PulseAll(_readLock);
}
SftpCloseAsyncResult asyncResult = _sftpSession.BeginClose(_handle, null, null);
_readAheadCompleted.Dispose();
_disposingWaitHandle.Dispose();
_sftpSession.EndClose(asyncResult);
_disposingWaitHandle = null;
}
}
private void StartReadAhead()
{
ThreadAbstraction.ExecuteThread(delegate {
while (!_endOfFileReceived) {
if (_exception != null)
break;
if (!ContinueReadAhead()) {
lock (_readLock) {
Monitor.PulseAll(_readLock);
}
break;
}
if (_semaphore.Wait(1000)) {
if (_endOfFileReceived || _exception != null)
break;
try {
_sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, ReadCompleted, new BufferedRead(_readAheadChunkIndex, _readAheadOffset));
} catch (Exception cause) {
HandleFailure(cause);
break;
}
_readAheadOffset += _chunkSize;
_readAheadChunkIndex++;
}
}
_readAheadCompleted.Set();
});
}
private bool ContinueReadAhead()
{
try {
int num = _sftpSession.WaitAny(_waitHandles, _sftpSession.OperationTimeout);
switch (num) {
case 0:
return false;
case 1:
return true;
default:
throw new NotImplementedException(string.Format(CultureInfo.InvariantCulture, "WaitAny return value '{0}' is not implemented.", num));
}
} catch (Exception exception) {
Exception ex = _exception = exception;
return false;
}
}
private void ReadCompleted(IAsyncResult result)
{
SftpReadAsyncResult sftpReadAsyncResult = (SftpReadAsyncResult)result;
byte[] array;
try {
array = sftpReadAsyncResult.EndInvoke();
} catch (Exception cause) {
HandleFailure(cause);
return;
}
BufferedRead bufferedRead = (BufferedRead)sftpReadAsyncResult.AsyncState;
bufferedRead.Complete(array);
lock (_readLock) {
_queue.Add(bufferedRead.ChunkIndex, bufferedRead);
Monitor.PulseAll(_readLock);
}
if (array.Length == 0)
_endOfFileReceived = true;
}
private void HandleFailure(Exception cause)
{
_exception = cause;
_semaphore.Release();
lock (_readLock) {
Monitor.PulseAll(_readLock);
}
}
}
}