AvroReader
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Azure.Storage.Internal.Avro
{
internal class AvroReader : IDisposable
{
private readonly Stream _dataStream;
private readonly Stream _headerStream;
private byte[] _syncMarker;
private Dictionary<string, string> _metadata;
private AvroType _itemType;
private long _itemsRemainingInBlock;
private bool _initalized;
private bool _disposed;
private readonly long _initialBlockOffset;
public virtual long BlockOffset { get; set; }
public virtual long ObjectIndex { get; set; }
public AvroReader(Stream dataStream)
{
if (dataStream.CanSeek) {
_dataStream = dataStream;
_headerStream = dataStream;
} else {
_dataStream = new StreamWithPosition(dataStream, 0);
_headerStream = _dataStream;
}
_metadata = new Dictionary<string, string>();
_initalized = false;
}
public AvroReader(Stream dataStream, Stream headerStream, long currentBlockOffset, long indexWithinCurrentBlock)
{
if (dataStream.CanSeek)
_dataStream = dataStream;
else
_dataStream = new StreamWithPosition(dataStream, 0);
if (headerStream.CanSeek)
_headerStream = headerStream;
else
_headerStream = new StreamWithPosition(headerStream, 0);
_metadata = new Dictionary<string, string>();
_initalized = false;
_initialBlockOffset = currentBlockOffset;
BlockOffset = currentBlockOffset;
ObjectIndex = indexWithinCurrentBlock;
_initalized = false;
}
public AvroReader()
{
}
[AsyncStateMachine(typeof(<Initalize>d__20))]
public virtual Task Initalize(bool async, CancellationToken cancellationToken = default(CancellationToken))
{
<Initalize>d__20 stateMachine = default(<Initalize>d__20);
stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public virtual bool HasNext()
{
if (_initalized)
return _itemsRemainingInBlock > 0;
return true;
}
[AsyncStateMachine(typeof(<Next>d__22))]
public virtual Task<object> Next(bool async, CancellationToken cancellationToken = default(CancellationToken))
{
<Next>d__22 stateMachine = default(<Next>d__22);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<object>.Create();
stateMachine.<>4__this = this;
stateMachine.async = async;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public void Dispose()
{
Dispose(true);
}
protected virtual void Dispose(bool disposing)
{
if (!_disposed) {
if (disposing) {
_dataStream.Dispose();
_headerStream.Dispose();
}
_disposed = true;
}
}
}
}