PartitionedUploader<TServiceSpecificData, TCompleteUploadReturn>
using Azure.Core.Pipeline;
using Azure.Storage.Common;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Azure.Storage
{
internal class PartitionedUploader<TServiceSpecificData, TCompleteUploadReturn>
{
private readonly struct ContentPartition<TContent>
{
public long AbsolutePosition { get; }
public long Length { get; }
public TContent Content { get; }
public ReadOnlyMemory<byte> ContentChecksum { get; }
public ContentPartition(long position, long length, TContent content, ReadOnlyMemory<byte> contentChecksum)
{
AbsolutePosition = position;
Length = length;
Content = content;
ContentChecksum = contentChecksum;
}
}
private delegate Task StageContentPartitionAsync<TContent> (TContent content, long offset, TServiceSpecificData args, UploadTransferValidationOptions validationOptions, IProgress<long> progressHandler, bool async, CancellationToken cancellationToken);
private delegate Task<(Stream PartitionContent, ReadOnlyMemory<byte> PartitionChecksum)> GetNextStreamPartition (Stream stream, long count, long absolutePosition, bool async, CancellationToken cancellationToken);
public delegate DiagnosticScope CreateScope (string operationName);
public delegate Task InitializeDestinationInternal (TServiceSpecificData args, bool async, CancellationToken cancellationToken);
public delegate Task<Response<TCompleteUploadReturn>> SingleUploadStreamingInternal (Stream contentStream, TServiceSpecificData args, IProgress<long> progressHandler, UploadTransferValidationOptions transferValidation, string operationName, bool async, CancellationToken cancellationToken);
public delegate Task<Response<TCompleteUploadReturn>> SingleUploadBinaryDataInternal (BinaryData content, TServiceSpecificData args, IProgress<long> progressHandler, UploadTransferValidationOptions transferValidation, string operationName, bool async, CancellationToken cancellationToken);
public delegate Task UploadPartitionStreamingInternal (Stream contentStream, long offset, TServiceSpecificData args, IProgress<long> progressHandler, UploadTransferValidationOptions transferValidation, bool async, CancellationToken cancellationToken);
public delegate Task UploadPartitionBinaryDataInternal (BinaryData content, long offset, TServiceSpecificData args, IProgress<long> progressHandler, UploadTransferValidationOptions transferValidation, bool async, CancellationToken cancellationToken);
public delegate Task<Response<TCompleteUploadReturn>> CommitPartitionedUploadInternal (List<(long Offset, long Size)> partitions, TServiceSpecificData args, bool async, CancellationToken cancellationToken);
public struct Behaviors
{
public InitializeDestinationInternal InitializeDestination {
[IsReadOnly]
get;
set;
}
public SingleUploadStreamingInternal SingleUploadStreaming {
[IsReadOnly]
get;
set;
}
public SingleUploadBinaryDataInternal SingleUploadBinaryData {
[IsReadOnly]
get;
set;
}
public UploadPartitionStreamingInternal UploadPartitionStreaming {
[IsReadOnly]
get;
set;
}
public UploadPartitionBinaryDataInternal UploadPartitionBinaryData {
[IsReadOnly]
get;
set;
}
public CommitPartitionedUploadInternal CommitPartitionedUpload {
[IsReadOnly]
get;
set;
}
public CreateScope Scope {
[IsReadOnly]
get;
set;
}
}
public static readonly InitializeDestinationInternal InitializeNoOp = (TServiceSpecificData args, bool async, CancellationToken cancellationToken) => Task.CompletedTask;
private readonly InitializeDestinationInternal _initializeDestinationInternal;
private readonly SingleUploadStreamingInternal _singleUploadStreamingInternal;
private readonly SingleUploadBinaryDataInternal _singleUploadBinaryDataInternal;
private readonly UploadPartitionStreamingInternal _uploadPartitionStreamingInternal;
private readonly UploadPartitionBinaryDataInternal _uploadPartitionBinaryDataInternal;
private readonly CommitPartitionedUploadInternal _commitPartitionedUploadInternal;
private readonly CreateScope _createScope;
private readonly int _maxWorkerCount;
private readonly ArrayPool<byte> _arrayPool;
private readonly long _singleUploadThreshold;
private readonly long? _blockSize;
private readonly StorageChecksumAlgorithm _validationAlgorithm;
private Func<Memory<byte>> _masterCrcSupplier;
private readonly string _operationName;
private bool UseMasterCrc => (int)_validationAlgorithm.ResolveAuto() == 3;
private UploadTransferValidationOptions ValidationOptions {
get {
UploadTransferValidationOptions val = new UploadTransferValidationOptions();
val.set_ChecksumAlgorithm(_validationAlgorithm);
return val;
}
}
public PartitionedUploader(Behaviors behaviors, StorageTransferOptions transferOptions, UploadTransferValidationOptions transferValidation, ArrayPool<byte> arrayPool = null, string operationName = null)
{
_initializeDestinationInternal = (behaviors.InitializeDestination ?? InitializeNoOp);
_singleUploadStreamingInternal = Argument.CheckNotNull<SingleUploadStreamingInternal>(behaviors.SingleUploadStreaming, "SingleUploadStreaming");
_singleUploadBinaryDataInternal = Argument.CheckNotNull<SingleUploadBinaryDataInternal>(behaviors.SingleUploadBinaryData, "SingleUploadBinaryData");
_uploadPartitionStreamingInternal = Argument.CheckNotNull<UploadPartitionStreamingInternal>(behaviors.UploadPartitionStreaming, "UploadPartitionStreaming");
_uploadPartitionBinaryDataInternal = Argument.CheckNotNull<UploadPartitionBinaryDataInternal>(behaviors.UploadPartitionBinaryData, "UploadPartitionBinaryData");
_commitPartitionedUploadInternal = Argument.CheckNotNull<CommitPartitionedUploadInternal>(behaviors.CommitPartitionedUpload, "CommitPartitionedUpload");
_createScope = Argument.CheckNotNull<CreateScope>(behaviors.Scope, "Scope");
_arrayPool = (arrayPool ?? ArrayPool<byte>.Shared);
if (transferOptions.get_MaximumConcurrency().HasValue) {
int? maximumConcurrency = transferOptions.get_MaximumConcurrency();
int num = 0;
if ((maximumConcurrency.GetValueOrDefault() > num) & maximumConcurrency.HasValue) {
_maxWorkerCount = transferOptions.get_MaximumConcurrency().Value;
goto IL_0104;
}
}
_maxWorkerCount = 5;
goto IL_0104;
IL_0104:
if (transferOptions.get_InitialTransferSize().HasValue && transferOptions.get_InitialTransferSize().Value > 0)
_singleUploadThreshold = Math.Min(transferOptions.get_InitialTransferSize().Value, 5242880000);
else
_singleUploadThreshold = 268435456;
if (transferOptions.get_MaximumTransferSize().HasValue) {
long? maximumTransferSize = transferOptions.get_MaximumTransferSize();
long num2 = 0;
if ((maximumTransferSize.GetValueOrDefault() > num2) & maximumTransferSize.HasValue)
_blockSize = Math.Min(4194304000, transferOptions.get_MaximumTransferSize().Value);
}
_validationAlgorithm = Argument.CheckNotNull<UploadTransferValidationOptions>(transferValidation, "transferValidation").get_ChecksumAlgorithm().ResolveAuto();
ReadOnlyMemory<byte> precalculatedChecksum = transferValidation.get_PrecalculatedChecksum();
if (!precalculatedChecksum.IsEmpty) {
if (!UseMasterCrc)
throw Errors.PrecalculatedHashNotSupportedOnSplit();
precalculatedChecksum = transferValidation.get_PrecalculatedChecksum();
Memory<byte> userSuppliedMasterCrc = new Memory<byte>(new byte[precalculatedChecksum.Length]);
precalculatedChecksum = transferValidation.get_PrecalculatedChecksum();
precalculatedChecksum.CopyTo(userSuppliedMasterCrc);
_masterCrcSupplier = (() => userSuppliedMasterCrc);
}
_operationName = operationName;
}
[AsyncStateMachine(typeof(PartitionedUploader<, >.<UploadInternal>d__31))]
public Task<Response<TCompleteUploadReturn>> UploadInternal(BinaryData content, TServiceSpecificData args, IProgress<long> progressHandler, bool async, CancellationToken cancellationToken = default(CancellationToken))
{
<UploadInternal>d__31 stateMachine = default(<UploadInternal>d__31);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<Response<TCompleteUploadReturn>>.Create();
stateMachine.<>4__this = this;
stateMachine.content = content;
stateMachine.args = args;
stateMachine.progressHandler = progressHandler;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(PartitionedUploader<, >.<UploadInternal>d__32))]
public Task<Response<TCompleteUploadReturn>> UploadInternal(Stream content, long? expectedContentLength, TServiceSpecificData args, IProgress<long> progressHandler, bool async, CancellationToken cancellationToken = default(CancellationToken))
{
<UploadInternal>d__32 stateMachine = default(<UploadInternal>d__32);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<Response<TCompleteUploadReturn>>.Create();
stateMachine.<>4__this = this;
stateMachine.content = content;
stateMachine.expectedContentLength = expectedContentLength;
stateMachine.args = args;
stateMachine.progressHandler = progressHandler;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private static long GetActualBlockSize(long? blockSize, long? totalLength)
{
long? nullable = blockSize;
if (!nullable.HasValue) {
long? nullable2 = totalLength;
long num = 104857600;
return ((nullable2.GetValueOrDefault() < num) & nullable2.HasValue) ? 4194304 : 8388608;
}
return nullable.GetValueOrDefault();
}
[AsyncStateMachine(typeof(PartitionedUploader<, >.<BufferAndOptionalChecksumStreamInternal>d__34))]
private Task<(Stream Stream, UploadTransferValidationOptions ValidationOptions)> BufferAndOptionalChecksumStreamInternal(Stream source, long? count, UploadTransferValidationOptions validationOptions, bool async, CancellationToken cancellationToken)
{
<BufferAndOptionalChecksumStreamInternal>d__34 stateMachine = default(<BufferAndOptionalChecksumStreamInternal>d__34);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<(Stream, UploadTransferValidationOptions)>.Create();
stateMachine.<>4__this = this;
stateMachine.source = source;
stateMachine.count = count;
stateMachine.validationOptions = validationOptions;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(<UploadPartitionsInternal>d__35<>))]
private Task<Response<TCompleteUploadReturn>> UploadPartitionsInternal<TContent>(IAsyncEnumerable<ContentPartition<TContent>> contentPartitions, TServiceSpecificData args, IProgress<long> progressHandler, StageContentPartitionAsync<TContent> stageContentAsync, bool async, CancellationToken cancellationToken)
{
<UploadPartitionsInternal>d__35<TContent> stateMachine = default(<UploadPartitionsInternal>d__35<TContent>);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<Response<TCompleteUploadReturn>>.Create();
stateMachine.<>4__this = this;
stateMachine.contentPartitions = contentPartitions;
stateMachine.args = args;
stateMachine.progressHandler = progressHandler;
stateMachine.stageContentAsync = stageContentAsync;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(<UploadInSequenceInternal>d__36<>))]
private Task<Response<TCompleteUploadReturn>> UploadInSequenceInternal<TContent>(IAsyncEnumerable<ContentPartition<TContent>> contentPartitions, TServiceSpecificData args, IProgress<long> progressHandler, StageContentPartitionAsync<TContent> stageContentAsync, bool async, CancellationToken cancellationToken)
{
<UploadInSequenceInternal>d__36<TContent> stateMachine = default(<UploadInSequenceInternal>d__36<TContent>);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<Response<TCompleteUploadReturn>>.Create();
stateMachine.<>4__this = this;
stateMachine.contentPartitions = contentPartitions;
stateMachine.args = args;
stateMachine.progressHandler = progressHandler;
stateMachine.stageContentAsync = stageContentAsync;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(<UploadInParallelAsync>d__37<>))]
private Task<Response<TCompleteUploadReturn>> UploadInParallelAsync<TContent>(IAsyncEnumerable<ContentPartition<TContent>> contentPartitions, TServiceSpecificData args, IProgress<long> progressHandler, StageContentPartitionAsync<TContent> stageContentAsync, CancellationToken cancellationToken)
{
<UploadInParallelAsync>d__37<TContent> stateMachine = default(<UploadInParallelAsync>d__37<TContent>);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<Response<TCompleteUploadReturn>>.Create();
stateMachine.<>4__this = this;
stateMachine.contentPartitions = contentPartitions;
stateMachine.args = args;
stateMachine.progressHandler = progressHandler;
stateMachine.stageContentAsync = stageContentAsync;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(PartitionedUploader<, >.<StageStreamPartitionInternal>d__38))]
private Task StageStreamPartitionInternal(Stream partition, long offset, TServiceSpecificData args, UploadTransferValidationOptions validationOptions, IProgress<long> progressHandler, bool async, CancellationToken cancellationToken)
{
<StageStreamPartitionInternal>d__38 stateMachine = default(<StageStreamPartitionInternal>d__38);
stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.partition = partition;
stateMachine.offset = offset;
stateMachine.args = args;
stateMachine.validationOptions = validationOptions;
stateMachine.progressHandler = progressHandler;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start<<StageStreamPartitionInternal>d__38>(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(PartitionedUploader<, >.<StageBinaryDataPartitionInternal>d__39))]
private Task StageBinaryDataPartitionInternal(BinaryData content, long offset, TServiceSpecificData args, UploadTransferValidationOptions validationOptions, IProgress<long> progressHandler, bool async, CancellationToken cancellationToken)
{
<StageBinaryDataPartitionInternal>d__39 stateMachine = default(<StageBinaryDataPartitionInternal>d__39);
stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.content = content;
stateMachine.offset = offset;
stateMachine.args = args;
stateMachine.validationOptions = validationOptions;
stateMachine.progressHandler = progressHandler;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start<<StageBinaryDataPartitionInternal>d__39>(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncIteratorStateMachine(typeof(PartitionedUploader<, >.<GetContentPartitionsBinaryDataInternal>d__40))]
private IAsyncEnumerable<ContentPartition<BinaryData>> GetContentPartitionsBinaryDataInternal(BinaryData content, long? contentLength, long blockSize, bool async, [EnumeratorCancellation] CancellationToken cancellationToken)
{
<GetContentPartitionsBinaryDataInternal>d__40 <GetContentPartitionsBinaryDataInternal>d__ = new <GetContentPartitionsBinaryDataInternal>d__40(-2);
<GetContentPartitionsBinaryDataInternal>d__.<>4__this = this;
<GetContentPartitionsBinaryDataInternal>d__.<>3__content = content;
<GetContentPartitionsBinaryDataInternal>d__.<>3__blockSize = blockSize;
<GetContentPartitionsBinaryDataInternal>d__.<>3__async = async;
return <GetContentPartitionsBinaryDataInternal>d__;
}
private IEnumerable<ContentPartition<BinaryData>> GetBinaryDataPartitions(BinaryData content, int blockSize)
{
int position = 0;
ReadOnlyMemory<byte> remaining = content.ToMemory();
while (!remaining.IsEmpty) {
ReadOnlyMemory<byte> next;
if (remaining.Length <= blockSize) {
next = remaining;
remaining = ReadOnlyMemory<byte>.Empty;
} else {
next = remaining.Slice(0, blockSize);
remaining = remaining.Slice(blockSize);
}
BinaryData content2 = BinaryData.FromBytes(next);
ContentHasher.GetHashResult hashOrDefault = ContentHasher.GetHashOrDefault(content2, this.ValidationOptions);
yield return new ContentPartition<BinaryData>((long)position, (long)next.Length, content2, hashOrDefault?.Checksum ?? ReadOnlyMemory<byte>.Empty);
position += next.Length;
next = default(ReadOnlyMemory<byte>);
}
}
[AsyncIteratorStateMachine(typeof(PartitionedUploader<, >.<GetStreamPartitionsAsync>d__42))]
private static IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartitionsAsync(Stream stream, long? streamLength, long blockSize, GetNextStreamPartition getNextPartition, bool async, [EnumeratorCancellation] CancellationToken cancellationToken)
{
<GetStreamPartitionsAsync>d__42 <GetStreamPartitionsAsync>d__ = new <GetStreamPartitionsAsync>d__42(-2);
<GetStreamPartitionsAsync>d__.<>3__stream = stream;
<GetStreamPartitionsAsync>d__.<>3__streamLength = streamLength;
<GetStreamPartitionsAsync>d__.<>3__blockSize = blockSize;
<GetStreamPartitionsAsync>d__.<>3__getNextPartition = getNextPartition;
<GetStreamPartitionsAsync>d__.<>3__async = async;
<GetStreamPartitionsAsync>d__.<>3__cancellationToken = cancellationToken;
return <GetStreamPartitionsAsync>d__;
}
[AsyncStateMachine(typeof(PartitionedUploader<, >.<GetBufferedPartitionInternal>d__43))]
private Task<(Stream PartitionContent, ReadOnlyMemory<byte> PartitionChecksum)> GetBufferedPartitionInternal(Stream stream, long count, long absolutePosition, bool async, CancellationToken cancellationToken)
{
<GetBufferedPartitionInternal>d__43 stateMachine = default(<GetBufferedPartitionInternal>d__43);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<(Stream, ReadOnlyMemory<byte>)>.Create();
stateMachine.<>4__this = this;
stateMachine.stream = stream;
stateMachine.count = count;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(PartitionedUploader<, >.<GetStreamedPartitionInternal>d__44))]
private Task<(Stream PartitionContent, ReadOnlyMemory<byte> PartitionChecksum)> GetStreamedPartitionInternal(Stream stream, long count, long absolutePosition, bool async, CancellationToken cancellationToken)
{
<GetStreamedPartitionInternal>d__44 stateMachine = default(<GetStreamedPartitionInternal>d__44);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<(Stream, ReadOnlyMemory<byte>)>.Create();
stateMachine.<>4__this = this;
stateMachine.stream = stream;
stateMachine.count = count;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
}
}