PartitionedDownloader
class PartitionedDownloader
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Azure.Storage.Common;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Azure.Storage.Blobs
{
internal class PartitionedDownloader
{
private const string _operationName = "BlobBaseClient.DownloadTo";
private const string _innerOperationName = "BlobBaseClient.DownloadStreaming";
private readonly BlobBaseClient _client;
private readonly int _maxWorkerCount;
private readonly long _initialRangeSize;
private readonly long _rangeSize;
private readonly StorageChecksumAlgorithm _validationAlgorithm;
private readonly int _checksumSize;
private StorageCrc64HashAlgorithm _masterCrcCalculator;
private readonly IProgress<long> _progress;
private readonly ArrayPool<byte> _arrayPool;
private bool UseMasterCrc => (int)_validationAlgorithm.ResolveAuto() == 3;
private DownloadTransferValidationOptions ValidationOptions {
get {
DownloadTransferValidationOptions val = new DownloadTransferValidationOptions();
val.set_ChecksumAlgorithm(_validationAlgorithm);
val.set_AutoValidateChecksum(false);
return val;
}
}
public PartitionedDownloader(BlobBaseClient client, StorageTransferOptions transferOptions = default(StorageTransferOptions), DownloadTransferValidationOptions transferValidation = null, IProgress<long> progress = null, ArrayPool<byte> arrayPool = null)
{
_client = client;
_arrayPool = (arrayPool ?? ArrayPool<byte>.Shared);
if (transferOptions.get_MaximumConcurrency().HasValue) {
int? maximumConcurrency = transferOptions.get_MaximumConcurrency();
int num = 0;
if ((maximumConcurrency.GetValueOrDefault() > num) & maximumConcurrency.HasValue) {
_maxWorkerCount = transferOptions.get_MaximumConcurrency().Value;
goto IL_006b;
}
}
_maxWorkerCount = 5;
goto IL_006b;
IL_006b:
if (transferOptions.get_MaximumTransferSize().HasValue && transferOptions.get_MaximumTransferSize().Value > 0)
_rangeSize = Math.Min(transferOptions.get_MaximumTransferSize().Value, 268435456);
else
_rangeSize = ((((transferValidation == null) ? 1 : ((int)transferValidation.get_ChecksumAlgorithm())) != 1) ? 4194304 : 4194304);
if (transferOptions.get_InitialTransferSize().HasValue && transferOptions.get_InitialTransferSize().Value > 0)
_initialRangeSize = transferOptions.get_InitialTransferSize().Value;
else
_initialRangeSize = ((((transferValidation == null) ? 1 : ((int)transferValidation.get_ChecksumAlgorithm())) != 1) ? 4194304 : 268435456);
Argument.AssertNotNull<DownloadTransferValidationOptions>(transferValidation, "transferValidation");
if (!transferValidation.get_AutoValidateChecksum())
throw Errors.CannotDeferTransactionalHashVerification();
_validationAlgorithm = transferValidation.get_ChecksumAlgorithm();
_checksumSize = ContentHasher.GetHashSizeInBytes(_validationAlgorithm);
_progress = progress;
if (_progress != null && !(_progress is AggregatingProgressIncrementer))
_progress = new AggregatingProgressIncrementer(_progress);
}
[AsyncStateMachine(typeof(<DownloadToInternal>d__16))]
public Task<Response> DownloadToInternal(Stream destination, BlobRequestConditions conditions, bool async, CancellationToken cancellationToken)
{
<DownloadToInternal>d__16 stateMachine = default(<DownloadToInternal>d__16);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<Response>.Create();
stateMachine.<>4__this = this;
stateMachine.destination = destination;
stateMachine.conditions = conditions;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(<HandleOneShotDownload>d__17))]
private Task HandleOneShotDownload(Response<BlobDownloadStreamingResult> response, Stream destination, bool async, CancellationToken cancellationToken)
{
<HandleOneShotDownload>d__17 stateMachine = default(<HandleOneShotDownload>d__17);
stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.response = response;
stateMachine.destination = destination;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(<FinalizeDownloadInternal>d__18))]
private Task FinalizeDownloadInternal(Stream destination, Memory<byte> composedCrc, bool async, CancellationToken cancellationToken)
{
<FinalizeDownloadInternal>d__18 stateMachine = default(<FinalizeDownloadInternal>d__18);
stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.destination = destination;
stateMachine.composedCrc = composedCrc;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private static long ParseRangeTotalLength(string range)
{
if (range == null)
return 0;
int num = range.IndexOf("/", StringComparison.InvariantCultureIgnoreCase);
if (num == -1)
throw BlobErrors.ParsingFullHttpRangeFailed(range);
return long.Parse(range.Substring(num + 1), CultureInfo.InvariantCulture);
}
[AsyncStateMachine(typeof(<CopyToInternal>d__20))]
private Task CopyToInternal(Response<BlobDownloadStreamingResult> response, Stream destination, Memory<byte> checksumBuffer, bool async, CancellationToken cancellationToken)
{
<CopyToInternal>d__20 stateMachine = default(<CopyToInternal>d__20);
stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.response = response;
stateMachine.destination = destination;
stateMachine.checksumBuffer = checksumBuffer;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private IEnumerable<HttpRange> GetRanges(long initialLength, long totalLength)
{
for (long offset = initialLength; offset < totalLength; offset += _rangeSize) {
yield return new HttpRange(offset, (long?)Math.Min(totalLength - offset, _rangeSize));
}
}
[AsyncStateMachine(typeof(<FlushFinalIfNecessaryInternal>d__22))]
private Task FlushFinalIfNecessaryInternal(Stream destination, bool async, CancellationToken cancellationToken)
{
<FlushFinalIfNecessaryInternal>d__22 stateMachine = default(<FlushFinalIfNecessaryInternal>d__22);
stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.destination = destination;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private void ValidateFinalCrc(ReadOnlySpan<byte> composedCrc)
{
Span<byte> span;
using (_arrayPool.RentAsSpanDisposable(8, out span)) {
_masterCrcCalculator.GetCurrentHash(span);
if (!span.SequenceEqual(composedCrc))
throw Errors.ChecksumMismatch(span, composedCrc);
}
}
}
}