PooledMemoryStream
Functions like a readable MemoryStream but uses an ArrayPool to supply the backing memory.
This stream support buffering long sizes.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Azure.Storage.Shared
{
internal class PooledMemoryStream : Stream
{
private class BufferPartition
{
public byte[] { get; set; }
public int { get; set; }
}
private const int = 134217728;
public int { get; }
public ArrayPool<byte> { get; }
private List<BufferPartition> { get; } = new List<BufferPartition>();
public override bool => true;
public override bool => true;
public override bool => true;
public override long => BufferSet.Sum((BufferPartition tuple) => tuple.DataLength);
public override long { get; set; }
public PooledMemoryStream(ArrayPool<byte> arrayPool, int maxArraySize)
{
ArrayPool = arrayPool;
MaxArraySize = maxArraySize;
}
public PooledMemoryStream()
{
}
[AsyncStateMachine(typeof(<BufferStreamPartitionInternal>d__13))]
internal static Task<PooledMemoryStream> (Stream stream, long minCount, long maxCount, ArrayPool<byte> arrayPool, int? maxArrayPoolRentalSize, bool async, CancellationToken cancellationToken)
{
<BufferStreamPartitionInternal>d__13 stateMachine = default(<BufferStreamPartitionInternal>d__13);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<PooledMemoryStream>.Create();
stateMachine.stream = stream;
stateMachine.minCount = minCount;
stateMachine.maxCount = maxCount;
stateMachine.arrayPool = arrayPool;
stateMachine.maxArrayPoolRentalSize = maxArrayPoolRentalSize;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(<ReadLoopInternal>d__14))]
private static Task<int> (Stream stream, byte[] buffer, int offset, int minCount, int maxCount, bool async, CancellationToken cancellationToken)
{
<ReadLoopInternal>d__14 stateMachine = default(<ReadLoopInternal>d__14);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<int>.Create();
stateMachine.stream = stream;
stateMachine.buffer = buffer;
stateMachine.offset = offset;
stateMachine.minCount = minCount;
stateMachine.maxCount = maxCount;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public override void ()
{
}
public override int (byte[] buffer, int offset, int count)
{
if (Position >= Length)
return 0;
int num = 0;
while (num < count && Position < Length) {
(byte[] CurrentBuffer, int BufferCount, long OffsetOfBuffer) bufferFromPosition = GetBufferFromPosition();
byte[] item = bufferFromPosition.CurrentBuffer;
int item2 = bufferFromPosition.BufferCount;
long item3 = bufferFromPosition.OffsetOfBuffer;
int num2 = (int)Min(Length - Position, item2 - (Position - item3), count - num);
Array.Copy(item, Position - item3, buffer, num, num2);
num += num2;
Position += num2;
}
return num;
}
public override int ()
{
if (Position >= Length)
return -1;
(byte[] CurrentBuffer, int BufferCount, long OffsetOfBuffer) bufferFromPosition = GetBufferFromPosition();
byte[] item = bufferFromPosition.CurrentBuffer;
long item2 = bufferFromPosition.OffsetOfBuffer;
byte result = item[Position - item2];
Position++;
return result;
}
private (byte[] CurrentBuffer, int BufferCount, long OffsetOfBuffer) ()
{
AssertPositionInBounds();
long num = 0;
foreach (BufferPartition item in BufferSet) {
if (num + item.DataLength > Position)
return (item.Buffer, item.DataLength, num);
num += item.DataLength;
}
throw new InvalidOperationException("Incorrect stream partition length.");
}
public override long (long offset, SeekOrigin origin)
{
switch (origin) {
case SeekOrigin.Begin:
Position = offset;
break;
case SeekOrigin.Current:
Position += offset;
break;
case SeekOrigin.End:
Position = Length + offset;
break;
}
return Position;
}
public override void (long value)
{
throw new NotSupportedException();
}
public override void (byte[] buffer, int offset, int count)
{
while (count > 0) {
BufferPartition bufferPartition = GetLatestBufferWithAvailableSpaceOrDefault();
if (bufferPartition == null) {
byte[] buffer2 = ArrayPool.Rent(MaxArraySize);
bufferPartition = new BufferPartition {
Buffer = buffer2,
DataLength = 0
};
BufferSet.Add(bufferPartition);
}
int num = Math.Min(bufferPartition.Buffer.Length - bufferPartition.DataLength, count);
Array.Copy(buffer, offset, bufferPartition.Buffer, bufferPartition.DataLength, num);
bufferPartition.DataLength += num;
count -= num;
offset += num;
Position += num;
}
}
protected override void (bool disposing)
{
base.Dispose(disposing);
Clear();
}
public void ()
{
foreach (BufferPartition item in BufferSet) {
ArrayPool.Return(item.Buffer, false);
}
BufferSet.Clear();
Position = 0;
}
private void ()
{
if (Position >= Length || Position < 0)
throw new InvalidOperationException("Cannot read outside the bounds of this stream.");
}
private BufferPartition ()
{
BufferPartition bufferPartition = BufferSet.LastOrDefault();
if (bufferPartition == null || bufferPartition.DataLength >= bufferPartition.Buffer.Length)
return null;
return bufferPartition;
}
private static long (long val1, long val2, long val3)
{
return Math.Min(Math.Min(val1, val2), val3);
}
}
}