LazyLoadingReadOnlyStream<TProperties>
Used for Open Read APIs.
using Azure.Core;
using Azure.Core.Pipeline;
using Azure.Storage.Shared;
using System;
using System.Buffers;
using System.Globalization;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Azure.Storage
{
internal class LazyLoadingReadOnlyStream<TProperties> : Stream
{
public delegate Task<Response<IDownloadedContent>> DownloadInternalAsync (HttpRange range, DownloadTransferValidationOptions transferValidation, bool async, CancellationToken cancellationToken);
public delegate Task<Response<TProperties>> GetPropertiesAsync (bool async, CancellationToken cancellationToken);
public delegate HttpRange PredictEncryptedRangeAdjustment (HttpRange range);
private long _position;
private long _length;
private readonly int _bufferSize;
private byte[] _buffer;
private int _bufferPosition;
private int _bufferLength;
private readonly bool _allowBlobModifications;
private bool _bufferInvalidated;
private readonly DownloadInternalAsync _downloadInternalFunc;
private readonly GetPropertiesAsync _getPropertiesInternalFunc;
private readonly DownloadTransferValidationOptions _validationOptions;
private readonly PredictEncryptedRangeAdjustment _predictEncryptedRangeAdjustment;
public static PredictEncryptedRangeAdjustment NoRangeAdjustment => (HttpRange range) => range;
public override bool CanRead => true;
public override bool CanSeek => true;
public override bool CanWrite => false;
public override long Length => _length;
public override long Position {
get {
return _position;
}
set {
Seek(value, SeekOrigin.Begin);
}
}
public LazyLoadingReadOnlyStream(DownloadInternalAsync downloadInternalFunc, GetPropertiesAsync getPropertiesFunc, DownloadTransferValidationOptions transferValidation, bool allowModifications, long initialLength, long position = 0, int? bufferSize = default(int?), PredictEncryptedRangeAdjustment rangePredictionFunc = null)
{
_downloadInternalFunc = downloadInternalFunc;
_getPropertiesInternalFunc = getPropertiesFunc;
_predictEncryptedRangeAdjustment = (rangePredictionFunc ?? ((PredictEncryptedRangeAdjustment)((HttpRange range) => range)));
_position = position;
int defaultValue = (int)(allowModifications ? 4194304 : Math.Min(initialLength, 4194304));
_bufferSize = bufferSize.GetValueOrDefault(defaultValue);
_buffer = ArrayPool<byte>.Shared.Rent(_bufferSize);
_allowBlobModifications = allowModifications;
_bufferPosition = 0;
_bufferLength = 0;
_length = initialLength;
_bufferInvalidated = false;
if (transferValidation != null && !transferValidation.get_AutoValidateChecksum())
throw Errors.CannotDeferTransactionalHashVerification();
object validationOptions;
if (transferValidation != null) {
DownloadTransferValidationOptions val = new DownloadTransferValidationOptions();
validationOptions = (object)val;
val.set_ChecksumAlgorithm(transferValidation.get_ChecksumAlgorithm());
val.set_AutoValidateChecksum(false);
} else
validationOptions = null;
_validationOptions = validationOptions;
}
public override int Read(byte[] buffer, int offset, int count)
{
return Azure.Core.Pipeline.TaskExtensions.EnsureCompleted<int>(ReadInternal(buffer, offset, count, false, default(CancellationToken)));
}
[AsyncStateMachine(typeof(LazyLoadingReadOnlyStream<>.<ReadAsync>d__19))]
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
<ReadAsync>d__19 stateMachine = default(<ReadAsync>d__19);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<int>.Create();
stateMachine.<>4__this = this;
stateMachine.buffer = buffer;
stateMachine.offset = offset;
stateMachine.count = count;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(LazyLoadingReadOnlyStream<>.<ReadInternal>d__20))]
public Task<int> ReadInternal(byte[] buffer, int offset, int count, bool async, CancellationToken cancellationToken)
{
<ReadInternal>d__20 stateMachine = default(<ReadInternal>d__20);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<int>.Create();
stateMachine.<>4__this = this;
stateMachine.buffer = buffer;
stateMachine.offset = offset;
stateMachine.count = count;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(LazyLoadingReadOnlyStream<>.<DownloadInternal>d__21))]
private Task<int> DownloadInternal(bool async, CancellationToken cancellationToken)
{
<DownloadInternal>d__21 stateMachine = default(<DownloadInternal>d__21);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<int>.Create();
stateMachine.<>4__this = this;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private static void ValidateReadParameters(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer", "buffer cannot be null.");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset", "offset cannot be less than 0.");
if (offset > buffer.Length)
throw new ArgumentOutOfRangeException("offset", "offset cannot exceed buffer length.");
if (count < 0)
throw new ArgumentOutOfRangeException("count", "count cannot be less than 0.");
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (_buffer != null) {
ArrayPool<byte>.Shared.Return(_buffer, true);
_buffer = null;
}
}
[AsyncStateMachine(typeof(LazyLoadingReadOnlyStream<>.<GetBlobLengthInternal>d__24))]
private Task<long> GetBlobLengthInternal(bool async, CancellationToken cancellationToken)
{
<GetBlobLengthInternal>d__24 stateMachine = default(<GetBlobLengthInternal>d__24);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<long>.Create();
stateMachine.<>4__this = this;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private static long GetBlobLengthFromResponse(Response response)
{
ResponseHeaders headers = response.get_Headers();
string text = default(string);
headers.TryGetValue("Content-Range", ref text);
if (text == null)
throw new ArgumentException("Content-Range header is missing on download response.");
return Convert.ToInt64(text.Split('/', StringSplitOptions.None)[1], CultureInfo.InvariantCulture);
}
private static HttpRange GetResponseRange(Response response)
{
ResponseHeaders headers = response.get_Headers();
string text = default(string);
headers.TryGetValue("Content-Range", ref text);
if (text == null)
throw new InvalidOperationException("Content-Range header is missing on download response.");
string[] array = text.Split('/', StringSplitOptions.None)[0].Split('-', StringSplitOptions.None);
long num = Convert.ToInt64(array[0].Split(' ', StringSplitOptions.None)[1], CultureInfo.InvariantCulture);
long num2 = Convert.ToInt64(array[1], CultureInfo.InvariantCulture);
return new HttpRange(num, (long?)(num2 - num + 1));
}
public override long Seek(long offset, SeekOrigin origin)
{
long num = CalculateNewPosition(offset, origin);
if (num == _position)
return _position;
if (num < 0)
throw new ArgumentException(string.Format("New {0} cannot be less than 0. Value was {1}", "offset", num), "offset");
if (num > _length)
throw new ArgumentException("You cannot seek past the last known length of the underlying blob or file.", "offset");
long num2 = _position - _bufferPosition;
if (num < _position && num >= num2) {
_bufferPosition = (int)(num - num2);
_position = num;
return num;
}
long num3 = _position + (_bufferLength - _bufferPosition);
if (num > _position && num < num3) {
_bufferPosition = (int)(num - num2);
_position = num;
return num;
}
_bufferInvalidated = true;
_position = num;
return num;
}
internal long CalculateNewPosition(long offset, SeekOrigin origin)
{
switch (origin) {
case SeekOrigin.Begin:
return offset;
case SeekOrigin.Current:
return _position + offset;
case SeekOrigin.End:
if (_allowBlobModifications)
throw new ArgumentException("Cannot Seek with SeekOrigin.End on a growing blob or file. Call Stream.Seek(Stream.Length, SeekOrigin.Begin) to get to the end of known data.", "origin");
return _length + offset;
default:
throw new ArgumentException("Unknown $SeekOrigin value", "origin");
}
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override void Flush()
{
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}