<PackageReference Include="Azure.Storage.Blobs" Version="12.24.0" />

PooledMemoryStream

Functions like a readable MemoryStream but uses an ArrayPool to supply the backing memory. This stream support buffering long sizes.
using System; using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace Azure.Storage.Shared { internal class PooledMemoryStream : Stream { private class BufferPartition { public byte[] Buffer { get; set; } public int DataLength { get; set; } } private const int DefaultMaxArrayPoolRentalSize = 134217728; public int MaxArraySize { get; } public ArrayPool<byte> ArrayPool { get; } private List<BufferPartition> BufferSet { get; } = new List<BufferPartition>(); public override bool CanRead => true; public override bool CanSeek => true; public override bool CanWrite => true; public override long Length => BufferSet.Sum((BufferPartition tuple) => tuple.DataLength); public override long Position { get; set; } public PooledMemoryStream(ArrayPool<byte> arrayPool, int maxArraySize) { ArrayPool = arrayPool; MaxArraySize = maxArraySize; } public PooledMemoryStream() { } [AsyncStateMachine(typeof(<BufferStreamPartitionInternal>d__13))] internal static Task<PooledMemoryStream> BufferStreamPartitionInternal(Stream stream, long minCount, long maxCount, ArrayPool<byte> arrayPool, int? maxArrayPoolRentalSize, bool async, CancellationToken cancellationToken) { <BufferStreamPartitionInternal>d__13 stateMachine = default(<BufferStreamPartitionInternal>d__13); stateMachine.<>t__builder = AsyncTaskMethodBuilder<PooledMemoryStream>.Create(); stateMachine.stream = stream; stateMachine.minCount = minCount; stateMachine.maxCount = maxCount; stateMachine.arrayPool = arrayPool; stateMachine.maxArrayPoolRentalSize = maxArrayPoolRentalSize; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } [AsyncStateMachine(typeof(<ReadLoopInternal>d__14))] private static Task<int> ReadLoopInternal(Stream stream, byte[] buffer, int offset, int minCount, int maxCount, bool async, CancellationToken cancellationToken) { <ReadLoopInternal>d__14 stateMachine = default(<ReadLoopInternal>d__14); stateMachine.<>t__builder = AsyncTaskMethodBuilder<int>.Create(); stateMachine.stream = stream; stateMachine.buffer = buffer; stateMachine.offset = offset; stateMachine.minCount = minCount; stateMachine.maxCount = maxCount; stateMachine.async = async; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } public override void Flush() { } public override int Read(byte[] buffer, int offset, int count) { if (Position >= Length) return 0; int num = 0; while (num < count && Position < Length) { (byte[] CurrentBuffer, int BufferCount, long OffsetOfBuffer) bufferFromPosition = GetBufferFromPosition(); byte[] item = bufferFromPosition.CurrentBuffer; int item2 = bufferFromPosition.BufferCount; long item3 = bufferFromPosition.OffsetOfBuffer; int num2 = (int)Min(Length - Position, item2 - (Position - item3), count - num); Array.Copy(item, Position - item3, buffer, num, num2); num += num2; Position += num2; } return num; } public override int ReadByte() { if (Position >= Length) return -1; (byte[] CurrentBuffer, int BufferCount, long OffsetOfBuffer) bufferFromPosition = GetBufferFromPosition(); byte[] item = bufferFromPosition.CurrentBuffer; long item2 = bufferFromPosition.OffsetOfBuffer; byte result = item[Position - item2]; Position++; return result; } private (byte[] CurrentBuffer, int BufferCount, long OffsetOfBuffer) GetBufferFromPosition() { AssertPositionInBounds(); long num = 0; foreach (BufferPartition item in BufferSet) { if (num + item.DataLength > Position) return (item.Buffer, item.DataLength, num); num += item.DataLength; } throw new InvalidOperationException("Incorrect stream partition length."); } public override long Seek(long offset, SeekOrigin origin) { switch (origin) { case SeekOrigin.Begin: Position = offset; break; case SeekOrigin.Current: Position += offset; break; case SeekOrigin.End: Position = Length + offset; break; } return Position; } public override void SetLength(long value) { throw new NotSupportedException(); } public override void Write(byte[] buffer, int offset, int count) { while (count > 0) { BufferPartition bufferPartition = GetLatestBufferWithAvailableSpaceOrDefault(); if (bufferPartition == null) { byte[] buffer2 = ArrayPool.Rent(MaxArraySize); bufferPartition = new BufferPartition { Buffer = buffer2, DataLength = 0 }; BufferSet.Add(bufferPartition); } int num = Math.Min(bufferPartition.Buffer.Length - bufferPartition.DataLength, count); Array.Copy(buffer, offset, bufferPartition.Buffer, bufferPartition.DataLength, num); bufferPartition.DataLength += num; count -= num; offset += num; Position += num; } } protected override void Dispose(bool disposing) { base.Dispose(disposing); Clear(); } public void Clear() { foreach (BufferPartition item in BufferSet) { ArrayPool.Return(item.Buffer, false); } BufferSet.Clear(); Position = 0; } private void AssertPositionInBounds() { if (Position >= Length || Position < 0) throw new InvalidOperationException("Cannot read outside the bounds of this stream."); } private BufferPartition GetLatestBufferWithAvailableSpaceOrDefault() { BufferPartition bufferPartition = BufferSet.LastOrDefault(); if (bufferPartition == null || bufferPartition.DataLength >= bufferPartition.Buffer.Length) return null; return bufferPartition; } private static long Min(long val1, long val2, long val3) { return Math.Min(Math.Min(val1, val2), val3); } } }