TimeoutEngine
using Polly.Utilities;
using System;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
namespace Polly.Timeout
{
internal static class TimeoutEngine
{
internal static TResult Implementation<TResult>(Func<Context, CancellationToken, TResult> action, Context context, CancellationToken cancellationToken, Func<Context, TimeSpan> timeoutProvider, TimeoutStrategy timeoutStrategy, Action<Context, TimeSpan, Task> onTimeout)
{
cancellationToken.ThrowIfCancellationRequested();
TimeSpan arg = timeoutProvider(context);
using (CancellationTokenSource cancellationTokenSource = new CancellationTokenSource())
using (CancellationTokenSource cancellationTokenSource2 = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cancellationTokenSource.Token)) {
CancellationToken combinedToken = cancellationTokenSource2.Token;
Task<TResult> task = null;
try {
if (timeoutStrategy != 0) {
SystemClock.CancelTokenAfter(cancellationTokenSource, arg);
task = Task.Run(() => action(context, combinedToken), combinedToken);
try {
task.Wait(cancellationTokenSource.Token);
} catch (AggregateException ex) when (ex.InnerExceptions.Count == 1) {
ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
}
return task.Result;
}
SystemClock.CancelTokenAfter(cancellationTokenSource, arg);
return action(context, combinedToken);
} catch (Exception innerException) {
if (cancellationTokenSource.IsCancellationRequested) {
onTimeout(context, arg, task);
throw new TimeoutRejectedException("The delegate executed through TimeoutPolicy did not complete within the timeout.", innerException);
}
throw;
}
}
}
internal static async Task<TResult> ImplementationAsync<TResult>(Func<Context, CancellationToken, Task<TResult>> action, Context context, Func<Context, TimeSpan> timeoutProvider, TimeoutStrategy timeoutStrategy, Func<Context, TimeSpan, Task, Task> onTimeoutAsync, CancellationToken cancellationToken, bool continueOnCapturedContext)
{
cancellationToken.ThrowIfCancellationRequested();
TimeSpan timeout = timeoutProvider(context);
TResult result = default(TResult);
using (CancellationTokenSource timeoutCancellationTokenSource = new CancellationTokenSource())
using (CancellationTokenSource combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCancellationTokenSource.Token)) {
Task<TResult> actionTask = null;
CancellationToken combinedToken = combinedTokenSource.Token;
try {
if (timeoutStrategy == TimeoutStrategy.Optimistic) {
SystemClock.CancelTokenAfter(timeoutCancellationTokenSource, timeout);
result = await action(context, combinedToken).ConfigureAwait(continueOnCapturedContext);
return result;
}
Task<TResult> task = TimeoutEngine.AsTask<TResult>(timeoutCancellationTokenSource.Token);
SystemClock.CancelTokenAfter(timeoutCancellationTokenSource, timeout);
actionTask = action(context, combinedToken);
result = await(await Task.WhenAny<TResult>(new Task<TResult>[2] {
actionTask,
task
}).ConfigureAwait(continueOnCapturedContext)).ConfigureAwait(continueOnCapturedContext);
return result;
} catch (Exception ex) {
Exception e = ex;
if (timeoutCancellationTokenSource.IsCancellationRequested) {
await onTimeoutAsync(context, timeout, actionTask).ConfigureAwait(continueOnCapturedContext);
throw new TimeoutRejectedException("The delegate executed asynchronously through TimeoutPolicy did not complete within the timeout.", e);
}
Exception obj = ex as Exception;
if (obj == null)
throw ex;
ExceptionDispatchInfo.Capture(obj).Throw();
}
}
return result;
}
private static Task<TResult> AsTask<TResult>(this CancellationToken cancellationToken)
{
TaskCompletionSource<TResult> tcs = (TaskCompletionSource<TResult>)new TaskCompletionSource<TResult>();
IDisposable registration = null;
registration = cancellationToken.Register(delegate {
((TaskCompletionSource<TResult>)tcs).TrySetCanceled();
registration?.Dispose();
}, false);
return ((TaskCompletionSource<TResult>)tcs).Task;
}
}
}