<PackageReference Include="System.ClientModel" Version="1.2.0" />

ReadTimeoutStream

Read-only Stream that will throw a OperationCanceledException if it has to wait longer than a configurable timeout to read more data
using System.IO; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace System.ClientModel.Internal { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal class ReadTimeoutStream : Stream { private readonly Stream _stream; private TimeSpan _readTimeout; private CancellationTokenSource _cancellationTokenSource; public override bool CanRead => _stream.CanRead; public override bool CanSeek => _stream.CanSeek; public override bool CanWrite => false; public override long Length => _stream.Length; public override long Position { get { return _stream.Position; } set { _stream.Position = value; } } public override int ReadTimeout { get { return (int)_readTimeout.TotalMilliseconds; } set { _readTimeout = TimeSpan.FromMilliseconds((double)value); UpdateReadTimeout(); } } public ReadTimeoutStream(Stream stream, TimeSpan readTimeout) { _stream = stream; _readTimeout = readTimeout; UpdateReadTimeout(); InitializeTokenSource(); } public override void Close() { _stream.Close(); } public override void Flush() { } public override int Read(byte[] buffer, int offset, int count) { bool dispose; CancellationTokenSource cancellationTokenSource = StartTimeout(default(CancellationToken), out dispose); try { return _stream.Read(buffer, offset, count); } catch (IOException innerException) { CancellationHelper.ThrowIfCancellationRequestedOrTimeout(default(CancellationToken), cancellationTokenSource.Token, innerException, _readTimeout); throw; } catch (ObjectDisposedException innerException2) { CancellationHelper.ThrowIfCancellationRequestedOrTimeout(default(CancellationToken), cancellationTokenSource.Token, innerException2, _readTimeout); throw; } catch (OperationCanceledException innerException3) { CancellationHelper.ThrowIfCancellationRequestedOrTimeout(default(CancellationToken), cancellationTokenSource.Token, innerException3, _readTimeout); throw; } finally { StopTimeout(cancellationTokenSource, dispose); } } [AsyncStateMachine(typeof(<ReadAsync>d__21))] public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { <ReadAsync>d__21 stateMachine = default(<ReadAsync>d__21); stateMachine.<>t__builder = AsyncTaskMethodBuilder<int>.Create(); stateMachine.<>4__this = this; stateMachine.buffer = buffer; stateMachine.offset = offset; stateMachine.count = count; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } public override long Seek(long offset, SeekOrigin origin) { return _stream.Seek(offset, origin); } public override void SetLength(long value) { throw new NotSupportedException(); } public override void Write(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } private CancellationTokenSource StartTimeout(CancellationToken additionalToken, out bool dispose) { if (_cancellationTokenSource.IsCancellationRequested) InitializeTokenSource(); CancellationTokenSource result; if (additionalToken.CanBeCanceled) { result = CancellationTokenSource.CreateLinkedTokenSource(additionalToken, _cancellationTokenSource.Token); dispose = true; } else { result = _cancellationTokenSource; dispose = false; } _cancellationTokenSource.CancelAfter(_readTimeout); return result; } private void InitializeTokenSource() { _cancellationTokenSource = new CancellationTokenSource(); _cancellationTokenSource.Token.Register(delegate(object state) { ((ReadTimeoutStream)state).DisposeStream(); }, this); } private void StopTimeout(CancellationTokenSource source, bool dispose) { _cancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan); if (dispose) source.Dispose(); } private void UpdateReadTimeout() { try { if (_stream.CanTimeout) _stream.ReadTimeout = (int)_readTimeout.TotalMilliseconds; } catch { } } protected override void Dispose(bool disposing) { base.Dispose(disposing); _stream.Dispose(); _cancellationTokenSource.Dispose(); } private void DisposeStream() { _stream.Dispose(); } } }