PooledMemoryStream
Functions like a readable MemoryStream but uses an ArrayPool to supply the backing memory.
This stream support buffering long sizes.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Azure.Storage.Shared
{
internal class PooledMemoryStream : Stream
{
private class BufferPartition
{
public byte[] { get; set; }
public int { get; set; }
}
private int ;
public ArrayPool<byte> { get; }
private List<BufferPartition> { get; } = new List<BufferPartition>();
public override bool => true;
public override bool => true;
public override bool => true;
public override long => BufferSet.Sum((BufferPartition tuple) => tuple.DataLength);
public override long { get; set; }
public PooledMemoryStream(ArrayPool<byte> arrayPool, int bufferSize, int? initialBufferSize = default(int?))
{
ArrayPool = arrayPool;
_newBufferSize = bufferSize;
if (initialBufferSize.HasValue)
BufferSet.Add(new BufferPartition {
Buffer = ArrayPool.Rent(initialBufferSize.Value),
DataLength = 0
});
}
public override void ()
{
}
public override int (byte[] buffer, int offset, int count)
{
return ReadImpl(new Span<byte>(buffer, offset, count));
}
public override Task<int> (byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return Task.FromResult(ReadImpl(new Span<byte>(buffer, offset, count)));
}
public override int (Span<byte> buffer)
{
return ReadImpl(buffer);
}
public override ValueTask<int> (Memory<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
{
return new ValueTask<int>(ReadImpl(buffer.Span));
}
private int (Span<byte> destBuffer)
{
if (Position >= Length)
return 0;
int num = 0;
while (num < destBuffer.Length && Position < Length) {
(byte[] CurrentBuffer, int BufferCount, long OffsetOfBuffer) bufferFromPosition = GetBufferFromPosition();
byte[] item = bufferFromPosition.CurrentBuffer;
int item2 = bufferFromPosition.BufferCount;
long item3 = bufferFromPosition.OffsetOfBuffer;
int num2 = (int)Min(Length - Position, item2 - (Position - item3), destBuffer.Length - num);
new Span<byte>(item, (int)(Position - item3), num2).CopyTo(destBuffer.Slice(num));
num += num2;
Position += num2;
}
return num;
}
public override int ()
{
if (Position >= Length)
return -1;
(byte[] CurrentBuffer, int BufferCount, long OffsetOfBuffer) bufferFromPosition = GetBufferFromPosition();
byte[] item = bufferFromPosition.CurrentBuffer;
long item2 = bufferFromPosition.OffsetOfBuffer;
byte result = item[Position - item2];
Position++;
return result;
}
private (byte[] CurrentBuffer, int BufferCount, long OffsetOfBuffer) ()
{
AssertPositionInBounds();
long num = 0;
foreach (BufferPartition item in BufferSet) {
if (num + item.DataLength > Position)
return (item.Buffer, item.DataLength, num);
num += item.DataLength;
}
throw new InvalidOperationException("Incorrect stream partition length.");
}
public override long (long offset, SeekOrigin origin)
{
switch (origin) {
case SeekOrigin.Begin:
Position = offset;
break;
case SeekOrigin.Current:
Position += offset;
break;
case SeekOrigin.End:
Position = Length + offset;
break;
}
return Position;
}
public override void (long value)
{
throw new NotSupportedException();
}
public override void (byte[] buffer, int offset, int count)
{
WriteImpl(new ReadOnlySpan<byte>(buffer, offset, count));
}
public override Task (byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
WriteImpl(new ReadOnlySpan<byte>(buffer, offset, count));
return Task.CompletedTask;
}
public override void (ReadOnlySpan<byte> buffer)
{
WriteImpl(buffer);
}
public override ValueTask (ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
{
WriteImpl(buffer.Span);
return new ValueTask(Task.CompletedTask);
}
private void (ReadOnlySpan<byte> writeBuffer)
{
while (writeBuffer.Length > 0) {
BufferPartition bufferPartition = GetLatestBufferWithAvailableSpaceOrDefault();
if (bufferPartition == null) {
byte[] buffer = ArrayPool.Rent(_newBufferSize);
bufferPartition = new BufferPartition {
Buffer = buffer,
DataLength = 0
};
BufferSet.Add(bufferPartition);
}
int num = Math.Min(bufferPartition.Buffer.Length - bufferPartition.DataLength, writeBuffer.Length);
writeBuffer.Slice(0, Math.Min(num, writeBuffer.Length)).CopyTo(new Span<byte>(bufferPartition.Buffer, bufferPartition.DataLength, num));
bufferPartition.DataLength += num;
Position += num;
writeBuffer = writeBuffer.Slice(num);
}
}
protected override void (bool disposing)
{
base.Dispose(disposing);
Clear();
}
public void ()
{
foreach (BufferPartition item in BufferSet) {
ArrayPool.Return(item.Buffer, false);
}
BufferSet.Clear();
Position = 0;
}
private void ()
{
if (Position >= Length || Position < 0)
throw new InvalidOperationException("Cannot read outside the bounds of this stream.");
}
private BufferPartition ()
{
BufferPartition bufferPartition = BufferSet.LastOrDefault();
if (bufferPartition == null || bufferPartition.DataLength >= bufferPartition.Buffer.Length)
return null;
return bufferPartition;
}
private static long (long val1, long val2, long val3)
{
return Math.Min(Math.Min(val1, val2), val3);
}
}
}