UnsafeBufferSequence
This class is a helper to write to a IBufferWriter<T> in a thread safe manner.
It uses the shared pool to allocate buffers and returns them to the pool when disposed.
Since there is no way to ensure someone didn't keep a reference to one of the buffers
it must be disposed of in the same context it was created and its referenced should not be stored or shared.
using System.Buffers;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace System.ClientModel.Internal
{
internal sealed class UnsafeBufferSequence : IBufferWriter<byte>, IDisposable
{
private class ReaderInstance : Reader
{
[System.Runtime.CompilerServices.NullableContext(1)]
public ReaderInstance(UnsafeBufferSegment[] buffers, int count)
: base(buffers, count)
{
}
}
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
internal class Reader : IDisposable
{
private UnsafeBufferSegment[] _buffers;
private int _count;
private bool _isDisposed;
private static readonly object _disposeLock = new object();
public long Length {
get {
if (_isDisposed)
throw new ObjectDisposedException("Reader");
long num = 0;
for (int i = 0; i < _count; i++) {
num += _buffers[i].Written;
}
return num;
}
}
private protected Reader(UnsafeBufferSegment[] buffers, int count)
{
_buffers = buffers;
_count = count;
}
public void CopyTo(Stream stream, CancellationToken cancellation)
{
if (_isDisposed)
throw new ObjectDisposedException("Reader");
for (int i = 0; i < _count; i++) {
cancellation.ThrowIfCancellationRequested();
UnsafeBufferSegment unsafeBufferSegment = _buffers[i];
stream.Write(unsafeBufferSegment.Array, 0, unsafeBufferSegment.Written);
}
}
[AsyncStateMachine(typeof(<CopyToAsync>d__8))]
public Task CopyToAsync(Stream stream, CancellationToken cancellation)
{
<CopyToAsync>d__8 stateMachine = default(<CopyToAsync>d__8);
stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.stream = stream;
stateMachine.cancellation = cancellation;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public BinaryData ToBinaryData()
{
if (_isDisposed)
throw new ObjectDisposedException("Reader");
long length = Length;
if (length > 2147483647)
throw new InvalidOperationException($"""{length}""{2147483647}");
if (_count == 1) {
byte[] array = new byte[_buffers[0].Written];
Buffer.BlockCopy(_buffers[0].Array, 0, array, 0, _buffers[0].Written);
return new BinaryData(array);
}
using (MemoryStream memoryStream = new MemoryStream((int)length)) {
CopyTo(memoryStream, default(CancellationToken));
return new BinaryData((ReadOnlyMemory<byte>)memoryStream.GetBuffer().AsMemory(0, (int)memoryStream.Position));
}
}
public void Dispose()
{
if (!_isDisposed) {
lock (_disposeLock) {
if (!_isDisposed) {
int count = _count;
UnsafeBufferSegment[] buffers = _buffers;
_count = 0;
_buffers = Array.Empty<UnsafeBufferSegment>();
for (int i = 0; i < count; i++) {
ArrayPool<byte>.Shared.Return(buffers[i].Array, false);
}
_isDisposed = true;
}
}
}
}
}
[System.Runtime.CompilerServices.Nullable(1)]
private volatile UnsafeBufferSegment[] _buffers;
private volatile int _count;
private readonly int _segmentSize;
[System.Runtime.CompilerServices.Nullable(1)]
private readonly object _lock = new object();
public UnsafeBufferSequence(int segmentSize = 16384)
{
_segmentSize = segmentSize;
_buffers = Array.Empty<UnsafeBufferSegment>();
}
public void Advance(int bytesWritten)
{
ref UnsafeBufferSegment reference = ref _buffers[_count - 1];
reference.Written += bytesWritten;
if (reference.Written > reference.Array.Length)
throw new ArgumentOutOfRangeException("bytesWritten");
}
public Memory<byte> GetMemory(int sizeHint = 0)
{
if (sizeHint < 256)
sizeHint = 256;
int sizeToRent = (sizeHint > _segmentSize) ? sizeHint : _segmentSize;
if (_buffers.Length == 0)
ExpandBuffers(sizeToRent);
ref UnsafeBufferSegment reference = ref _buffers[_count - 1];
Memory<byte> result = reference.Array.AsMemory(reference.Written);
if (result.Length >= sizeHint)
return result;
ExpandBuffers(sizeToRent);
return _buffers[_count - 1].Array;
}
private void ExpandBuffers(int sizeToRent)
{
lock (_lock) {
int num = (_count == 0) ? 1 : (_count * 2);
UnsafeBufferSegment[] array = new UnsafeBufferSegment[num];
if (_count > 0)
_buffers.CopyTo(array, 0);
_buffers = array;
_buffers[_count].Array = ArrayPool<byte>.Shared.Rent(sizeToRent);
_count = ((num == 1) ? num : (_count + 1));
}
}
public Span<byte> GetSpan(int sizeHint = 0)
{
return GetMemory(sizeHint).Span;
}
public void Dispose()
{
int count = default(int);
UnsafeBufferSegment[] buffers = default(UnsafeBufferSegment[]);
lock (_lock) {
count = _count;
buffers = _buffers;
_count = 0;
_buffers = Array.Empty<UnsafeBufferSegment>();
}
for (int i = 0; i < count; i++) {
ArrayPool<byte>.Shared.Return(buffers[i].Array, false);
}
}
[System.Runtime.CompilerServices.NullableContext(1)]
public Reader ExtractReader()
{
lock (_lock) {
ReaderInstance result = new ReaderInstance(_buffers, _count);
_count = 0;
_buffers = Array.Empty<UnsafeBufferSegment>();
return result;
}
}
}
}