ReadTimeoutStream
Read-only Stream that will throw a OperationCanceledException if it has to wait longer than a configurable timeout to read more data
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace System.ClientModel.Internal
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
internal class ReadTimeoutStream : Stream
{
private readonly Stream _stream;
private TimeSpan _readTimeout;
private CancellationTokenSource _cancellationTokenSource;
public override bool CanRead => _stream.CanRead;
public override bool CanSeek => _stream.CanSeek;
public override bool CanWrite => false;
public override long Length => _stream.Length;
public override long Position {
get {
return _stream.Position;
}
set {
_stream.Position = value;
}
}
public override int ReadTimeout {
get {
return (int)_readTimeout.TotalMilliseconds;
}
set {
_readTimeout = TimeSpan.FromMilliseconds((double)value);
UpdateReadTimeout();
}
}
public ReadTimeoutStream(Stream stream, TimeSpan readTimeout)
{
_stream = stream;
_readTimeout = readTimeout;
UpdateReadTimeout();
InitializeTokenSource();
}
public override void Close()
{
_stream.Close();
}
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
{
bool dispose;
CancellationTokenSource cancellationTokenSource = StartTimeout(default(CancellationToken), out dispose);
try {
return _stream.Read(buffer, offset, count);
} catch (IOException innerException) {
CancellationHelper.ThrowIfCancellationRequestedOrTimeout(default(CancellationToken), cancellationTokenSource.Token, innerException, _readTimeout);
throw;
} catch (ObjectDisposedException innerException2) {
CancellationHelper.ThrowIfCancellationRequestedOrTimeout(default(CancellationToken), cancellationTokenSource.Token, innerException2, _readTimeout);
throw;
} catch (OperationCanceledException innerException3) {
CancellationHelper.ThrowIfCancellationRequestedOrTimeout(default(CancellationToken), cancellationTokenSource.Token, innerException3, _readTimeout);
throw;
} finally {
StopTimeout(cancellationTokenSource, dispose);
}
}
[AsyncStateMachine(typeof(<ReadAsync>d__21))]
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
<ReadAsync>d__21 stateMachine = default(<ReadAsync>d__21);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<int>.Create();
stateMachine.<>4__this = this;
stateMachine.buffer = buffer;
stateMachine.offset = offset;
stateMachine.count = count;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public override long Seek(long offset, SeekOrigin origin)
{
return _stream.Seek(offset, origin);
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
private CancellationTokenSource StartTimeout(CancellationToken additionalToken, out bool dispose)
{
if (_cancellationTokenSource.IsCancellationRequested)
InitializeTokenSource();
CancellationTokenSource result;
if (additionalToken.CanBeCanceled) {
result = CancellationTokenSource.CreateLinkedTokenSource(additionalToken, _cancellationTokenSource.Token);
dispose = true;
} else {
result = _cancellationTokenSource;
dispose = false;
}
_cancellationTokenSource.CancelAfter(_readTimeout);
return result;
}
private void InitializeTokenSource()
{
_cancellationTokenSource = new CancellationTokenSource();
_cancellationTokenSource.Token.Register(delegate(object state) {
((ReadTimeoutStream)state).DisposeStream();
}, this);
}
private void StopTimeout(CancellationTokenSource source, bool dispose)
{
_cancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan);
if (dispose)
source.Dispose();
}
private void UpdateReadTimeout()
{
try {
if (_stream.CanTimeout)
_stream.ReadTimeout = (int)_readTimeout.TotalMilliseconds;
} catch {
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_stream.Dispose();
_cancellationTokenSource.Dispose();
}
private void DisposeStream()
{
_stream.Dispose();
}
}
}