BlobQuickQueryStream
QuickQueryStream.
using Azure.Core.Pipeline;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Internal.Avro;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace Azure.Storage.Blobs
{
internal class BlobQuickQueryStream : Stream
{
internal Stream _avroStream;
internal AvroReader _avroReader;
internal byte[] _buffer;
internal int _bufferOffset;
internal int _bufferLength;
internal IProgress<long> _progressHandler;
internal Action<BlobQueryError> _errorHandler;
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length {
get {
throw new NotSupportedException();
}
}
public override long Position {
get {
throw new NotSupportedException();
}
set {
throw new NotSupportedException();
}
}
public BlobQuickQueryStream(Stream avroStream, IProgress<long> progressHandler = null, Action<BlobQueryError> errorHandler = null)
{
_avroStream = avroStream;
_avroReader = new AvroReader(_avroStream);
_bufferOffset = 0;
_bufferLength = 0;
_progressHandler = progressHandler;
_errorHandler = errorHandler;
}
public override int Read(byte[] buffer, int offset, int count)
{
return ReadInternal(false, buffer, offset, count).EnsureCompleted();
}
[AsyncStateMachine(typeof(<ReadAsync>d__9))]
public new Task<int> ReadAsync(byte[] buffer, int offset, int count)
{
<ReadAsync>d__9 stateMachine = default(<ReadAsync>d__9);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<int>.Create();
stateMachine.<>4__this = this;
stateMachine.buffer = buffer;
stateMachine.offset = offset;
stateMachine.count = count;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(<ReadInternal>d__10))]
private Task<int> ReadInternal(bool async, byte[] buffer, int offset, int count)
{
<ReadInternal>d__10 stateMachine = default(<ReadInternal>d__10);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<int>.Create();
stateMachine.<>4__this = this;
stateMachine.async = async;
stateMachine.buffer = buffer;
stateMachine.offset = offset;
stateMachine.count = count;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
internal static void ValidateReadParameters(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer", "Parameter cannot be null.");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset", "Parameter cannot be negative.");
if (count < 0)
throw new ArgumentOutOfRangeException("count", "Parameter cannot be negative.");
if (offset + count > buffer.Length)
throw new ArgumentException("The sum of offset and count cannot be greater than buffer length.");
}
internal void ProcessErrorRecord(Dictionary<string, object> record)
{
record.TryGetValue("fatal", out object value);
record.TryGetValue("name", out object value2);
record.TryGetValue("description", out object value3);
record.TryGetValue("position", out object value4);
if (value == null)
throw new InvalidOperationException("Avro error record is missing fatal property");
if (value2 == null)
throw new InvalidOperationException("Avro error record is missing name property");
if (value3 == null)
throw new InvalidOperationException("Avro error record is missing description property");
if (value4 == null)
throw new InvalidOperationException("Avro error record is missing position property");
if (_errorHandler != null) {
BlobQueryError obj = new BlobQueryError {
IsFatal = (bool)value,
Name = (string)value2,
Description = (string)value3,
Position = (long)value4
};
_errorHandler(obj);
}
}
public override void Flush()
{
throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
protected override void Dispose(bool disposing)
{
base.Dispose(true);
if (_buffer != null) {
ArrayPool<byte>.Shared.Return(_buffer, true);
_buffer = null;
}
_avroStream.Dispose();
if (_buffer != null) {
ArrayPool<byte>.Shared.Return(_buffer, true);
_buffer = null;
}
_avroReader.Dispose();
}
}
}