<PackageReference Include="Azure.Storage.Blobs" Version="12.24.1" />

StorageWriteStream

abstract class StorageWriteStream : Stream
using Azure.Core.Pipeline; using Azure.Storage.Common; using System; using System.Buffers; using System.IO; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace Azure.Storage.Shared { internal abstract class StorageWriteStream : Stream { protected long _position; protected long _bufferSize; protected readonly IProgress<long> _progressHandler; protected readonly PooledMemoryStream _buffer; private ArrayPool<byte> _bufferPool; private readonly StorageChecksumAlgorithm _checksumAlgorithm; private StorageCrc64HashAlgorithm _masterCrcChecksummer; private Memory<byte> _composedCrc = Memory<byte>.Empty; private Memory<byte> _userProvidedChecksum = Memory<byte>.Empty; private IHasher _bufferChecksumer; private bool _disposed; private readonly DisposableBucket _accumulatedDisposables = new DisposableBucket(); private bool UseMasterCrc => (int)_checksumAlgorithm.ResolveAuto() == 3; public override bool CanRead => false; public override bool CanSeek => false; public override bool CanWrite => true; public override long Length { get { throw new NotSupportedException(); } } public override long Position { get { return _position; } set { throw new NotSupportedException(); } } protected StorageWriteStream(long position, long bufferSize, IProgress<long> progressHandler, UploadTransferValidationOptions transferValidation, PooledMemoryStream buffer = null, ArrayPool<byte> bufferPool = null) { _position = position; _bufferSize = bufferSize; _bufferPool = (bufferPool ?? ArrayPool<byte>.Shared); if (progressHandler != null) _progressHandler = new AggregatingProgressIncrementer(progressHandler); _checksumAlgorithm = Argument.CheckNotNull<UploadTransferValidationOptions>(transferValidation, "transferValidation").get_ChecksumAlgorithm(); ReadOnlyMemory<byte> precalculatedChecksum = transferValidation.get_PrecalculatedChecksum(); if (!precalculatedChecksum.IsEmpty) { if (!UseMasterCrc) throw Errors.PrecalculatedHashNotSupportedOnSplit(); DisposableBucket accumulatedDisposables = _accumulatedDisposables; ArrayPool<byte> bufferPool2 = _bufferPool; precalculatedChecksum = transferValidation.get_PrecalculatedChecksum(); accumulatedDisposables.Add(bufferPool2.RentDisposable(precalculatedChecksum.Length, out byte[] array)); array.Clear(); byte[] array2 = array; precalculatedChecksum = transferValidation.get_PrecalculatedChecksum(); _userProvidedChecksum = new Memory<byte>(array2, 0, precalculatedChecksum.Length); precalculatedChecksum = transferValidation.get_PrecalculatedChecksum(); precalculatedChecksum.CopyTo(_userProvidedChecksum); } if (UseMasterCrc) { _masterCrcChecksummer = StorageCrc64HashAlgorithm.Create(); _accumulatedDisposables.Add(_bufferPool.RentDisposable(8, out byte[] array3)); array3.Clear(); _composedCrc = new Memory<byte>(array3, 0, 8); } if (buffer != null) { if (buffer.Position != 0) throw Errors.CannotInitializeWriteStreamWithData(); _buffer = buffer; } else { _buffer = new PooledMemoryStream(_bufferPool, (int)Math.Min(1048576, bufferSize)); _accumulatedDisposables.Add(_buffer); } _bufferChecksumer = ContentHasher.GetHasherFromAlgorithmId(_checksumAlgorithm); } public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } public override void Write(byte[] buffer, int offset, int count) { WriteInternal(buffer, offset, count, false, default(CancellationToken)).EnsureCompleted(); } [AsyncStateMachine(typeof(<WriteAsync>d__30))] public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { <WriteAsync>d__30 stateMachine = default(<WriteAsync>d__30); stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create(); stateMachine.<>4__this = this; stateMachine.buffer = buffer; stateMachine.offset = offset; stateMachine.count = count; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } [AsyncStateMachine(typeof(<WriteInternal>d__31))] private Task WriteInternal(byte[] buffer, int offset, int count, bool async, CancellationToken cancellationToken) { <WriteInternal>d__31 stateMachine = default(<WriteInternal>d__31); stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create(); stateMachine.<>4__this = this; stateMachine.buffer = buffer; stateMachine.offset = offset; stateMachine.count = count; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } public override void Flush() { FlushInternal(false, default(CancellationToken)).EnsureCompleted(); } [AsyncStateMachine(typeof(<FlushAsync>d__33))] public override Task FlushAsync(CancellationToken cancellationToken) { <FlushAsync>d__33 stateMachine = default(<FlushAsync>d__33); stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create(); stateMachine.<>4__this = this; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } [AsyncStateMachine(typeof(<FlushInternal>d__34))] private Task FlushInternal(bool async, CancellationToken cancellationToken) { <FlushInternal>d__34 stateMachine = default(<FlushInternal>d__34); stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create(); stateMachine.<>4__this = this; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } protected virtual Task CommitInternal(bool async, CancellationToken cancellationToken) { return Task.CompletedTask; } [AsyncStateMachine(typeof(<AppendAndClearBufferInternal>d__36))] private Task AppendAndClearBufferInternal(UploadTransferValidationOptions validationOptions, bool async, CancellationToken cancellationToken) { <AppendAndClearBufferInternal>d__36 stateMachine = default(<AppendAndClearBufferInternal>d__36); stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create(); stateMachine.<>4__this = this; stateMachine.validationOptions = validationOptions; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } protected abstract Task AppendInternal(UploadTransferValidationOptions validationOptions, bool async, CancellationToken cancellationToken); protected abstract void ValidateBufferSize(long bufferSize); [AsyncStateMachine(typeof(<WriteToBufferInternal>d__39))] protected Task WriteToBufferInternal(byte[] buffer, int offset, int count, bool async, CancellationToken cancellationToken) { <WriteToBufferInternal>d__39 stateMachine = default(<WriteToBufferInternal>d__39); stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create(); stateMachine.<>4__this = this; stateMachine.buffer = buffer; stateMachine.offset = offset; stateMachine.count = count; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } protected static void ValidateWriteParameters(byte[] buffer, int offset, int count) { if (buffer == null) throw new ArgumentNullException("buffer", "buffer cannot be null."); if (offset < 0) throw new ArgumentOutOfRangeException("offset", "offset cannot be less than 0."); if (offset > buffer.Length) throw new ArgumentOutOfRangeException("offset", "offset cannot be greater than buffer length."); if (count < 0) throw new ArgumentOutOfRangeException("count", "count cannot be less than 0."); if (offset + count > buffer.Length) throw new ArgumentOutOfRangeException("offset and count", "offset + count cannot exceed buffer length."); } protected override void Dispose(bool disposing) { if (!_disposed) { if (disposing) try { Flush(); ValidateCallerCrcIfAny(); } finally { _accumulatedDisposables.Dispose(); } _disposed = true; base.Dispose(disposing); } } private void ValidateCallerCrcIfAny() { if (UseMasterCrc && !_userProvidedChecksum.IsEmpty) { Span<byte> span; using (_bufferPool.RentAsSpanDisposable(_masterCrcChecksummer.get_HashLengthInBytes(), out span)) { _masterCrcChecksummer.GetCurrentHash(span); if (!span.SequenceEqual(_userProvidedChecksum.Span)) throw Errors.ChecksumMismatch(span, _userProvidedChecksum.Span); } } } protected IDisposable FinalizeAndReplaceBufferChecksum(out UploadTransferValidationOptions validationOptions) { if (_buffer.Length == 0) { UploadTransferValidationOptions val = new UploadTransferValidationOptions(); val.set_ChecksumAlgorithm(1); validationOptions = val; return null; } Memory<byte> memory = Memory<byte>.Empty; IDisposable result = null; if (_bufferChecksumer != null) { result = _bufferPool.RentDisposable(_bufferChecksumer.HashSizeInBytes, out byte[] array); array.Clear(); memory = new Memory<byte>(array, 0, _bufferChecksumer.HashSizeInBytes); _bufferChecksumer.GetFinalHash(memory.Span); if (UseMasterCrc) StorageCrc64Composer.Compose((_composedCrc.ToArray(), 0), (memory.ToArray(), _buffer.Length)).CopyTo(_composedCrc); _bufferChecksumer?.Dispose(); _bufferChecksumer = ContentHasher.GetHasherFromAlgorithmId(_checksumAlgorithm); } UploadTransferValidationOptions val2 = new UploadTransferValidationOptions(); val2.set_ChecksumAlgorithm(_checksumAlgorithm); val2.set_PrecalculatedChecksum((ReadOnlyMemory<byte>)memory); validationOptions = val2; return result; } } }