<PackageReference Include="SSH.NET" Version="2016.1.0-beta2" />

SftpFileReader

using Renci.SshNet.Abstractions; using Renci.SshNet.Common; using System; using System.Collections.Generic; using System.Globalization; using System.Threading; namespace Renci.SshNet.Sftp { internal class SftpFileReader : ISftpFileReader, IDisposable { internal class BufferedRead { public int ChunkIndex { get; set; } public byte[] Data { get; set; } public ulong Offset { get; set; } public BufferedRead(int chunkIndex, ulong offset) { ChunkIndex = chunkIndex; Offset = offset; } public void Complete(byte[] data) { Data = data; } } private const int ReadAheadWaitTimeoutInMilliseconds = 1000; private readonly byte[] _handle; private readonly ISftpSession _sftpSession; private readonly uint _chunkSize; private ulong _offset; private readonly long? _fileSize; private readonly Dictionary<int, BufferedRead> _queue; private readonly WaitHandle[] _waitHandles; private int _readAheadChunkIndex; private ulong _readAheadOffset; private readonly ManualResetEvent _readAheadCompleted; private ManualResetEvent _disposingWaitHandle; private int _nextChunkIndex; private bool _endOfFileReceived; private bool _isEndOfFileRead; private readonly SemaphoreLight _semaphore; private readonly object _readLock; private Exception _exception; public SftpFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize) { _handle = handle; _sftpSession = sftpSession; _chunkSize = chunkSize; _fileSize = fileSize; _semaphore = new SemaphoreLight(maxPendingReads); _queue = new Dictionary<int, BufferedRead>(maxPendingReads); _readLock = new object(); _readAheadCompleted = new ManualResetEvent(false); _disposingWaitHandle = new ManualResetEvent(false); _waitHandles = _sftpSession.CreateWaitHandleArray(_disposingWaitHandle, _semaphore.AvailableWaitHandle); StartReadAhead(); } public byte[] Read() { if (_disposingWaitHandle == null) throw new ObjectDisposedException(GetType().FullName); if (_exception != null) throw _exception; if (_isEndOfFileRead) throw new SshException("Attempting to read beyond the end of the file."); BufferedRead value; lock (_readLock) { while (!_queue.TryGetValue(_nextChunkIndex, out value) && _exception == null) { Monitor.Wait(_readLock); } if (_exception != null) throw _exception; byte[] data = value.Data; if (value.Offset == _offset) { if (data.Length == 0) _isEndOfFileRead = true; else { _queue.Remove(_nextChunkIndex); _offset += (ulong)data.Length; _nextChunkIndex++; } _semaphore.Release(); return data; } if (data.Length == 0 && _fileSize.HasValue && _offset == (ulong)_fileSize.Value) { _isEndOfFileRead = true; _semaphore.Release(); return value.Data; } } ulong num = value.Offset - _offset; byte[] array = _sftpSession.RequestRead(_handle, _offset, (uint)num); if (array.Length == 0) { lock (_readLock) { if (value.Data.Length != 0) { _exception = new SshException("Unexpectedly reached end of file."); if (_semaphore != null) _semaphore.Release(); throw _exception; } _isEndOfFileRead = true; if (_semaphore != null) _semaphore.Release(); return array; } } _offset += (uint)array.Length; return array; } ~SftpFileReader() { Dispose(false); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected void Dispose(bool disposing) { if (_disposingWaitHandle != null && disposing) { _exception = new ObjectDisposedException(GetType().FullName); _disposingWaitHandle.Set(); _readAheadCompleted.WaitOne(); lock (_readLock) { _semaphore.Dispose(); Monitor.PulseAll(_readLock); } SftpCloseAsyncResult asyncResult = _sftpSession.BeginClose(_handle, null, null); _readAheadCompleted.Dispose(); _disposingWaitHandle.Dispose(); _sftpSession.EndClose(asyncResult); _disposingWaitHandle = null; } } private void StartReadAhead() { ThreadAbstraction.ExecuteThread(delegate { while (!_endOfFileReceived) { if (_exception != null) break; if (!ContinueReadAhead()) { lock (_readLock) { Monitor.PulseAll(_readLock); } break; } if (_semaphore.Wait(1000)) { if (_endOfFileReceived || _exception != null) break; try { _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, ReadCompleted, new BufferedRead(_readAheadChunkIndex, _readAheadOffset)); } catch (Exception cause) { HandleFailure(cause); break; } _readAheadOffset += _chunkSize; _readAheadChunkIndex++; } } _readAheadCompleted.Set(); }); } private bool ContinueReadAhead() { try { int num = _sftpSession.WaitAny(_waitHandles, _sftpSession.OperationTimeout); switch (num) { case 0: return false; case 1: return true; default: throw new NotImplementedException(string.Format(CultureInfo.InvariantCulture, "WaitAny return value '{0}' is not implemented.", num)); } } catch (Exception exception) { Exception ex = _exception = exception; return false; } } private void ReadCompleted(IAsyncResult result) { SftpReadAsyncResult sftpReadAsyncResult = (SftpReadAsyncResult)result; byte[] array; try { array = sftpReadAsyncResult.EndInvoke(); } catch (Exception cause) { HandleFailure(cause); return; } BufferedRead bufferedRead = (BufferedRead)sftpReadAsyncResult.AsyncState; bufferedRead.Complete(array); lock (_readLock) { _queue.Add(bufferedRead.ChunkIndex, bufferedRead); Monitor.PulseAll(_readLock); } if (array.Length == 0) _endOfFileReceived = true; } private void HandleFailure(Exception cause) { _exception = cause; _semaphore.Release(); lock (_readLock) { Monitor.PulseAll(_readLock); } } } }