<PackageReference Include="Azure.Storage.Blobs" Version="12.25.0" />

PartitionedUploader<TServiceSpecificData, TCompleteUploadReturn>

class PartitionedUploader<TServiceSpecificData, TCompleteUploadReturn>
using Azure.Core.Pipeline; using Azure.Storage.Common; using System; using System.Buffers; using System.Collections.Generic; using System.IO; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace Azure.Storage { internal class PartitionedUploader<TServiceSpecificData, TCompleteUploadReturn> { private readonly struct ContentPartition<TContent> { public long AbsolutePosition { get; } public long Length { get; } public TContent Content { get; } public ReadOnlyMemory<byte> ContentChecksum { get; } public ContentPartition(long position, long length, TContent content, ReadOnlyMemory<byte> contentChecksum) { AbsolutePosition = position; Length = length; Content = content; ContentChecksum = contentChecksum; } } private delegate Task StageContentPartitionAsync<TContent> (TContent content, long offset, TServiceSpecificData args, UploadTransferValidationOptions validationOptions, IProgress<long> progressHandler, bool async, CancellationToken cancellationToken); private delegate Task<(Stream PartitionContent, ReadOnlyMemory<byte> PartitionChecksum)> GetNextStreamPartition (Stream stream, long count, long absolutePosition, bool async, CancellationToken cancellationToken); public delegate DiagnosticScope CreateScope (string operationName); public delegate Task InitializeDestinationInternal (TServiceSpecificData args, bool async, CancellationToken cancellationToken); public delegate Task<Response<TCompleteUploadReturn>> SingleUploadStreamingInternal (Stream contentStream, TServiceSpecificData args, IProgress<long> progressHandler, UploadTransferValidationOptions transferValidation, string operationName, bool async, CancellationToken cancellationToken); public delegate Task<Response<TCompleteUploadReturn>> SingleUploadBinaryDataInternal (BinaryData content, TServiceSpecificData args, IProgress<long> progressHandler, UploadTransferValidationOptions transferValidation, string operationName, bool async, CancellationToken cancellationToken); public delegate Task UploadPartitionStreamingInternal (Stream contentStream, long offset, TServiceSpecificData args, IProgress<long> progressHandler, UploadTransferValidationOptions transferValidation, bool async, CancellationToken cancellationToken); public delegate Task UploadPartitionBinaryDataInternal (BinaryData content, long offset, TServiceSpecificData args, IProgress<long> progressHandler, UploadTransferValidationOptions transferValidation, bool async, CancellationToken cancellationToken); public delegate Task<Response<TCompleteUploadReturn>> CommitPartitionedUploadInternal (List<(long Offset, long Size)> partitions, TServiceSpecificData args, bool async, CancellationToken cancellationToken); public struct Behaviors { public InitializeDestinationInternal InitializeDestination { [IsReadOnly] get; set; } public SingleUploadStreamingInternal SingleUploadStreaming { [IsReadOnly] get; set; } public SingleUploadBinaryDataInternal SingleUploadBinaryData { [IsReadOnly] get; set; } public UploadPartitionStreamingInternal UploadPartitionStreaming { [IsReadOnly] get; set; } public UploadPartitionBinaryDataInternal UploadPartitionBinaryData { [IsReadOnly] get; set; } public CommitPartitionedUploadInternal CommitPartitionedUpload { [IsReadOnly] get; set; } public CreateScope Scope { [IsReadOnly] get; set; } } public static readonly InitializeDestinationInternal InitializeNoOp = (TServiceSpecificData args, bool async, CancellationToken cancellationToken) => Task.CompletedTask; private readonly InitializeDestinationInternal _initializeDestinationInternal; private readonly SingleUploadStreamingInternal _singleUploadStreamingInternal; private readonly SingleUploadBinaryDataInternal _singleUploadBinaryDataInternal; private readonly UploadPartitionStreamingInternal _uploadPartitionStreamingInternal; private readonly UploadPartitionBinaryDataInternal _uploadPartitionBinaryDataInternal; private readonly CommitPartitionedUploadInternal _commitPartitionedUploadInternal; private readonly CreateScope _createScope; private readonly int _maxWorkerCount; private readonly ArrayPool<byte> _arrayPool; private readonly long _singleUploadThreshold; private readonly long? _blockSize; private readonly StorageChecksumAlgorithm _validationAlgorithm; private Func<Memory<byte>> _masterCrcSupplier; private readonly string _operationName; private bool UseMasterCrc => (int)_validationAlgorithm.ResolveAuto() == 3; private UploadTransferValidationOptions ValidationOptions { get { UploadTransferValidationOptions val = new UploadTransferValidationOptions(); val.set_ChecksumAlgorithm(_validationAlgorithm); return val; } } public PartitionedUploader(Behaviors behaviors, StorageTransferOptions transferOptions, UploadTransferValidationOptions transferValidation, ArrayPool<byte> arrayPool = null, string operationName = null) { _initializeDestinationInternal = (behaviors.InitializeDestination ?? InitializeNoOp); _singleUploadStreamingInternal = Argument.CheckNotNull<SingleUploadStreamingInternal>(behaviors.SingleUploadStreaming, "SingleUploadStreaming"); _singleUploadBinaryDataInternal = Argument.CheckNotNull<SingleUploadBinaryDataInternal>(behaviors.SingleUploadBinaryData, "SingleUploadBinaryData"); _uploadPartitionStreamingInternal = Argument.CheckNotNull<UploadPartitionStreamingInternal>(behaviors.UploadPartitionStreaming, "UploadPartitionStreaming"); _uploadPartitionBinaryDataInternal = Argument.CheckNotNull<UploadPartitionBinaryDataInternal>(behaviors.UploadPartitionBinaryData, "UploadPartitionBinaryData"); _commitPartitionedUploadInternal = Argument.CheckNotNull<CommitPartitionedUploadInternal>(behaviors.CommitPartitionedUpload, "CommitPartitionedUpload"); _createScope = Argument.CheckNotNull<CreateScope>(behaviors.Scope, "Scope"); _arrayPool = (arrayPool ?? ArrayPool<byte>.Shared); if (transferOptions.get_MaximumConcurrency().HasValue) { int? maximumConcurrency = transferOptions.get_MaximumConcurrency(); int num = 0; if ((maximumConcurrency.GetValueOrDefault() > num) & maximumConcurrency.HasValue) { _maxWorkerCount = transferOptions.get_MaximumConcurrency().Value; goto IL_0104; } } _maxWorkerCount = 5; goto IL_0104; IL_0104: if (transferOptions.get_InitialTransferSize().HasValue && transferOptions.get_InitialTransferSize().Value > 0) _singleUploadThreshold = Math.Min(transferOptions.get_InitialTransferSize().Value, 5242880000); else _singleUploadThreshold = 268435456; if (transferOptions.get_MaximumTransferSize().HasValue) { long? maximumTransferSize = transferOptions.get_MaximumTransferSize(); long num2 = 0; if ((maximumTransferSize.GetValueOrDefault() > num2) & maximumTransferSize.HasValue) _blockSize = Math.Min(4194304000, transferOptions.get_MaximumTransferSize().Value); } _validationAlgorithm = Argument.CheckNotNull<UploadTransferValidationOptions>(transferValidation, "transferValidation").get_ChecksumAlgorithm().ResolveAuto(); ReadOnlyMemory<byte> precalculatedChecksum = transferValidation.get_PrecalculatedChecksum(); if (!precalculatedChecksum.IsEmpty) { if (!UseMasterCrc) throw Errors.PrecalculatedHashNotSupportedOnSplit(); precalculatedChecksum = transferValidation.get_PrecalculatedChecksum(); Memory<byte> userSuppliedMasterCrc = new Memory<byte>(new byte[precalculatedChecksum.Length]); precalculatedChecksum = transferValidation.get_PrecalculatedChecksum(); precalculatedChecksum.CopyTo(userSuppliedMasterCrc); _masterCrcSupplier = (() => userSuppliedMasterCrc); } _operationName = operationName; } [AsyncStateMachine(typeof(PartitionedUploader<, >.<UploadInternal>d__31))] public Task<Response<TCompleteUploadReturn>> UploadInternal(BinaryData content, TServiceSpecificData args, IProgress<long> progressHandler, bool async, CancellationToken cancellationToken = default(CancellationToken)) { <UploadInternal>d__31 stateMachine = default(<UploadInternal>d__31); stateMachine.<>t__builder = AsyncTaskMethodBuilder<Response<TCompleteUploadReturn>>.Create(); stateMachine.<>4__this = this; stateMachine.content = content; stateMachine.args = args; stateMachine.progressHandler = progressHandler; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } [AsyncStateMachine(typeof(PartitionedUploader<, >.<UploadInternal>d__32))] public Task<Response<TCompleteUploadReturn>> UploadInternal(Stream content, long? expectedContentLength, TServiceSpecificData args, IProgress<long> progressHandler, bool async, CancellationToken cancellationToken = default(CancellationToken)) { <UploadInternal>d__32 stateMachine = default(<UploadInternal>d__32); stateMachine.<>t__builder = AsyncTaskMethodBuilder<Response<TCompleteUploadReturn>>.Create(); stateMachine.<>4__this = this; stateMachine.content = content; stateMachine.expectedContentLength = expectedContentLength; stateMachine.args = args; stateMachine.progressHandler = progressHandler; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } private static long GetActualBlockSize(long? blockSize, long? totalLength) { long? nullable = blockSize; if (!nullable.HasValue) { long? nullable2 = totalLength; long num = 104857600; return ((nullable2.GetValueOrDefault() < num) & nullable2.HasValue) ? 4194304 : 8388608; } return nullable.GetValueOrDefault(); } [AsyncStateMachine(typeof(PartitionedUploader<, >.<BufferAndOptionalChecksumStreamInternal>d__34))] private Task<(Stream Stream, UploadTransferValidationOptions ValidationOptions)> BufferAndOptionalChecksumStreamInternal(Stream source, long? count, UploadTransferValidationOptions validationOptions, bool async, CancellationToken cancellationToken) { <BufferAndOptionalChecksumStreamInternal>d__34 stateMachine = default(<BufferAndOptionalChecksumStreamInternal>d__34); stateMachine.<>t__builder = AsyncTaskMethodBuilder<(Stream, UploadTransferValidationOptions)>.Create(); stateMachine.<>4__this = this; stateMachine.source = source; stateMachine.count = count; stateMachine.validationOptions = validationOptions; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } [AsyncStateMachine(typeof(<UploadPartitionsInternal>d__35<>))] private Task<Response<TCompleteUploadReturn>> UploadPartitionsInternal<TContent>(IAsyncEnumerable<ContentPartition<TContent>> contentPartitions, TServiceSpecificData args, IProgress<long> progressHandler, StageContentPartitionAsync<TContent> stageContentAsync, bool async, CancellationToken cancellationToken) { <UploadPartitionsInternal>d__35<TContent> stateMachine = default(<UploadPartitionsInternal>d__35<TContent>); stateMachine.<>t__builder = AsyncTaskMethodBuilder<Response<TCompleteUploadReturn>>.Create(); stateMachine.<>4__this = this; stateMachine.contentPartitions = contentPartitions; stateMachine.args = args; stateMachine.progressHandler = progressHandler; stateMachine.stageContentAsync = stageContentAsync; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } [AsyncStateMachine(typeof(<UploadInSequenceInternal>d__36<>))] private Task<Response<TCompleteUploadReturn>> UploadInSequenceInternal<TContent>(IAsyncEnumerable<ContentPartition<TContent>> contentPartitions, TServiceSpecificData args, IProgress<long> progressHandler, StageContentPartitionAsync<TContent> stageContentAsync, bool async, CancellationToken cancellationToken) { <UploadInSequenceInternal>d__36<TContent> stateMachine = default(<UploadInSequenceInternal>d__36<TContent>); stateMachine.<>t__builder = AsyncTaskMethodBuilder<Response<TCompleteUploadReturn>>.Create(); stateMachine.<>4__this = this; stateMachine.contentPartitions = contentPartitions; stateMachine.args = args; stateMachine.progressHandler = progressHandler; stateMachine.stageContentAsync = stageContentAsync; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } [AsyncStateMachine(typeof(<UploadInParallelAsync>d__37<>))] private Task<Response<TCompleteUploadReturn>> UploadInParallelAsync<TContent>(IAsyncEnumerable<ContentPartition<TContent>> contentPartitions, TServiceSpecificData args, IProgress<long> progressHandler, StageContentPartitionAsync<TContent> stageContentAsync, CancellationToken cancellationToken) { <UploadInParallelAsync>d__37<TContent> stateMachine = default(<UploadInParallelAsync>d__37<TContent>); stateMachine.<>t__builder = AsyncTaskMethodBuilder<Response<TCompleteUploadReturn>>.Create(); stateMachine.<>4__this = this; stateMachine.contentPartitions = contentPartitions; stateMachine.args = args; stateMachine.progressHandler = progressHandler; stateMachine.stageContentAsync = stageContentAsync; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } [AsyncStateMachine(typeof(PartitionedUploader<, >.<StageStreamPartitionInternal>d__38))] private Task StageStreamPartitionInternal(Stream partition, long offset, TServiceSpecificData args, UploadTransferValidationOptions validationOptions, IProgress<long> progressHandler, bool async, CancellationToken cancellationToken) { <StageStreamPartitionInternal>d__38 stateMachine = default(<StageStreamPartitionInternal>d__38); stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create(); stateMachine.<>4__this = this; stateMachine.partition = partition; stateMachine.offset = offset; stateMachine.args = args; stateMachine.validationOptions = validationOptions; stateMachine.progressHandler = progressHandler; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start<<StageStreamPartitionInternal>d__38>(ref stateMachine); return stateMachine.<>t__builder.Task; } [AsyncStateMachine(typeof(PartitionedUploader<, >.<StageBinaryDataPartitionInternal>d__39))] private Task StageBinaryDataPartitionInternal(BinaryData content, long offset, TServiceSpecificData args, UploadTransferValidationOptions validationOptions, IProgress<long> progressHandler, bool async, CancellationToken cancellationToken) { <StageBinaryDataPartitionInternal>d__39 stateMachine = default(<StageBinaryDataPartitionInternal>d__39); stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create(); stateMachine.<>4__this = this; stateMachine.content = content; stateMachine.offset = offset; stateMachine.args = args; stateMachine.validationOptions = validationOptions; stateMachine.progressHandler = progressHandler; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start<<StageBinaryDataPartitionInternal>d__39>(ref stateMachine); return stateMachine.<>t__builder.Task; } [AsyncIteratorStateMachine(typeof(PartitionedUploader<, >.<GetContentPartitionsBinaryDataInternal>d__40))] private IAsyncEnumerable<ContentPartition<BinaryData>> GetContentPartitionsBinaryDataInternal(BinaryData content, long? contentLength, long blockSize, bool async, [EnumeratorCancellation] CancellationToken cancellationToken) { <GetContentPartitionsBinaryDataInternal>d__40 <GetContentPartitionsBinaryDataInternal>d__ = new <GetContentPartitionsBinaryDataInternal>d__40(-2); <GetContentPartitionsBinaryDataInternal>d__.<>4__this = this; <GetContentPartitionsBinaryDataInternal>d__.<>3__content = content; <GetContentPartitionsBinaryDataInternal>d__.<>3__blockSize = blockSize; <GetContentPartitionsBinaryDataInternal>d__.<>3__async = async; return <GetContentPartitionsBinaryDataInternal>d__; } private IEnumerable<ContentPartition<BinaryData>> GetBinaryDataPartitions(BinaryData content, int blockSize) { int position = 0; ReadOnlyMemory<byte> remaining = content.ToMemory(); while (!remaining.IsEmpty) { ReadOnlyMemory<byte> next; if (remaining.Length <= blockSize) { next = remaining; remaining = ReadOnlyMemory<byte>.Empty; } else { next = remaining.Slice(0, blockSize); remaining = remaining.Slice(blockSize); } BinaryData content2 = BinaryData.FromBytes(next); ContentHasher.GetHashResult hashOrDefault = ContentHasher.GetHashOrDefault(content2, this.ValidationOptions); yield return new ContentPartition<BinaryData>((long)position, (long)next.Length, content2, hashOrDefault?.Checksum ?? ReadOnlyMemory<byte>.Empty); position += next.Length; next = default(ReadOnlyMemory<byte>); } } [AsyncIteratorStateMachine(typeof(PartitionedUploader<, >.<GetStreamPartitionsAsync>d__42))] private static IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartitionsAsync(Stream stream, long? streamLength, long blockSize, GetNextStreamPartition getNextPartition, bool async, [EnumeratorCancellation] CancellationToken cancellationToken) { <GetStreamPartitionsAsync>d__42 <GetStreamPartitionsAsync>d__ = new <GetStreamPartitionsAsync>d__42(-2); <GetStreamPartitionsAsync>d__.<>3__stream = stream; <GetStreamPartitionsAsync>d__.<>3__streamLength = streamLength; <GetStreamPartitionsAsync>d__.<>3__blockSize = blockSize; <GetStreamPartitionsAsync>d__.<>3__getNextPartition = getNextPartition; <GetStreamPartitionsAsync>d__.<>3__async = async; <GetStreamPartitionsAsync>d__.<>3__cancellationToken = cancellationToken; return <GetStreamPartitionsAsync>d__; } [AsyncStateMachine(typeof(PartitionedUploader<, >.<GetBufferedPartitionInternal>d__43))] private Task<(Stream PartitionContent, ReadOnlyMemory<byte> PartitionChecksum)> GetBufferedPartitionInternal(Stream stream, long count, long absolutePosition, bool async, CancellationToken cancellationToken) { <GetBufferedPartitionInternal>d__43 stateMachine = default(<GetBufferedPartitionInternal>d__43); stateMachine.<>t__builder = AsyncTaskMethodBuilder<(Stream, ReadOnlyMemory<byte>)>.Create(); stateMachine.<>4__this = this; stateMachine.stream = stream; stateMachine.count = count; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } [AsyncStateMachine(typeof(PartitionedUploader<, >.<GetStreamedPartitionInternal>d__44))] private Task<(Stream PartitionContent, ReadOnlyMemory<byte> PartitionChecksum)> GetStreamedPartitionInternal(Stream stream, long count, long absolutePosition, bool async, CancellationToken cancellationToken) { <GetStreamedPartitionInternal>d__44 stateMachine = default(<GetStreamedPartitionInternal>d__44); stateMachine.<>t__builder = AsyncTaskMethodBuilder<(Stream, ReadOnlyMemory<byte>)>.Create(); stateMachine.<>4__this = this; stateMachine.stream = stream; stateMachine.count = count; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } } }