TaskObservableExtensions
Provides a set of static methods for converting tasks to observable sequences.
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq.ObservableImpl;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace System.Reactive.Threading.Tasks
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
public static class TaskObservableExtensions
{
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class SlowTaskObservable : IObservable<Unit>
{
private readonly Task _task;
[System.Runtime.CompilerServices.Nullable(2)]
private readonly IScheduler _scheduler;
private readonly bool _ignoreExceptionsAfterUnsubscribe;
public SlowTaskObservable(Task task, [System.Runtime.CompilerServices.Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe)
{
_task = task;
_scheduler = scheduler;
_ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe;
}
public IDisposable Subscribe(IObserver<Unit> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
CancellationDisposable cancellationDisposable = new CancellationDisposable();
TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(_scheduler);
if (_scheduler == null)
_task.ContinueWith(delegate(Task t, object subjectObject) {
t.EmitTaskResult((IObserver<Unit>)subjectObject);
}, observer, cancellationDisposable.Token, taskContinuationOptions, TaskScheduler.Current);
else
_task.ContinueWithState(delegate(Task task, (IScheduler scheduler, IObserver<Unit> observer) tuple) {
tuple.scheduler.ScheduleAction((task, tuple.observer), delegate((Task task, IObserver<Unit> observer) tuple2) {
tuple2.task.EmitTaskResult(tuple2.observer);
});
}, (_scheduler, observer), taskContinuationOptions, cancellationDisposable.Token);
if (_ignoreExceptionsAfterUnsubscribe)
_task.ContinueWith((Task t) => t.Exception, TaskContinuationOptions.OnlyOnFaulted);
return cancellationDisposable;
}
}
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class SlowTaskObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult> : IObservable<TResult>
{
private readonly Task<TResult> _task;
[System.Runtime.CompilerServices.Nullable(2)]
private readonly IScheduler _scheduler;
private readonly bool _ignoreExceptionsAfterUnsubscribe;
public SlowTaskObservable(Task<TResult> task, [System.Runtime.CompilerServices.Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe)
{
_task = task;
_scheduler = scheduler;
_ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe;
}
public IDisposable Subscribe(IObserver<TResult> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
CancellationDisposable cancellationDisposable = new CancellationDisposable();
TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(_scheduler);
if (_scheduler == null)
_task.ContinueWith(delegate(Task<TResult> t, object subjectObject) {
TaskObservableExtensions.EmitTaskResult<TResult>(t, (IObserver<TResult>)subjectObject);
}, observer, cancellationDisposable.Token, taskContinuationOptions, TaskScheduler.Current);
else
System.Threading.Tasks.TaskExtensions.ContinueWithState<TResult, (IScheduler, IObserver<TResult>)>(_task, (Action<Task<TResult>, (IScheduler, IObserver<TResult>)>)delegate(Task<TResult> task, (IScheduler scheduler, IObserver<TResult> observer) tuple) {
Scheduler.ScheduleAction<(Task<TResult>, IObserver<TResult>)>(tuple.scheduler, (task, tuple.observer), (Action<(Task<TResult>, IObserver<TResult>)>)delegate((Task<TResult> task, IObserver<TResult> observer) tuple2) {
TaskObservableExtensions.EmitTaskResult<TResult>(tuple2.task, tuple2.observer);
});
}, (_scheduler, observer), taskContinuationOptions, cancellationDisposable.Token);
if (_ignoreExceptionsAfterUnsubscribe)
_task.ContinueWith((Task<TResult> t) => t.Exception, TaskContinuationOptions.OnlyOnFaulted);
return cancellationDisposable;
}
}
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
private sealed class ToTaskObserver<[System.Runtime.CompilerServices.Nullable(2)] TResult> : SafeObserver<TResult>
{
private readonly CancellationToken _ct;
private readonly TaskCompletionSource<TResult> _tcs;
private readonly CancellationTokenRegistration _ctr;
private bool _hasValue;
[System.Runtime.CompilerServices.Nullable(2)]
private TResult _lastValue;
public ToTaskObserver(TaskCompletionSource<TResult> tcs, CancellationToken ct)
{
_ct = ct;
_tcs = tcs;
if (ct.CanBeCanceled)
_ctr = ct.Register(delegate(object this) {
((ToTaskObserver<TResult>)this).Cancel();
}, this);
}
public override void OnNext(TResult value)
{
_hasValue = true;
_lastValue = value;
}
public override void OnError(Exception error)
{
_tcs.TrySetException(error);
_ctr.Dispose();
Dispose();
}
public override void OnCompleted()
{
if (!_hasValue)
try {
throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
} catch (Exception exception) {
_tcs.TrySetException(exception);
}
else
_tcs.TrySetResult(_lastValue);
_ctr.Dispose();
Dispose();
}
private void Cancel()
{
Dispose();
_tcs.TrySetCanceled(_ct);
}
}
public static IObservable<Unit> ToObservable(this Task task)
{
if (task == null)
throw new ArgumentNullException("task");
return ToObservableImpl(task, null, false);
}
public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return task.ToObservable(new TaskObservationOptions(scheduler, false));
}
public static IObservable<Unit> ToObservable(this Task task, TaskObservationOptions options)
{
if (task == null)
throw new ArgumentNullException("task");
if (options == null)
throw new ArgumentNullException("options");
return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe);
}
internal static IObservable<Unit> ToObservable(this Task task, TaskObservationOptions.Value options)
{
if (task == null)
throw new ArgumentNullException("task");
return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe);
}
private static IObservable<Unit> ToObservableImpl(Task task, [System.Runtime.CompilerServices.Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe)
{
if (task.IsCompleted) {
if (scheduler == null)
scheduler = ImmediateScheduler.Instance;
switch (task.Status) {
case TaskStatus.Faulted:
return new Throw<Unit>(task.GetSingleException(), scheduler);
case TaskStatus.Canceled:
return new Throw<Unit>(new TaskCanceledException(task), scheduler);
default:
return new Return<Unit>(Unit.Default, scheduler);
}
}
return new SlowTaskObservable(task, scheduler, ignoreExceptionsAfterUnsubscribe);
}
private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
{
switch (task.Status) {
case TaskStatus.RanToCompletion:
subject.OnNext(Unit.Default);
subject.OnCompleted();
break;
case TaskStatus.Faulted:
subject.OnError(task.GetSingleException());
break;
case TaskStatus.Canceled:
subject.OnError(new TaskCanceledException(task));
break;
}
}
internal static IDisposable Subscribe(this Task task, IObserver<Unit> observer)
{
if (task.IsCompleted) {
task.EmitTaskResult(observer);
return Disposable.Empty;
}
CancellationDisposable cancellationDisposable = new CancellationDisposable();
task.ContinueWith(delegate(Task t, object observerObject) {
t.EmitTaskResult((IObserver<Unit>)observerObject);
}, observer, cancellationDisposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
return cancellationDisposable;
}
public static IObservable<TResult> ToObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task)
{
if (task == null)
throw new ArgumentNullException("task");
return ToObservableImpl(task, null, false);
}
public static IObservable<TResult> ToObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, IScheduler scheduler)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return task.ToObservable(new TaskObservationOptions(scheduler, false));
}
public static IObservable<TResult> ToObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, TaskObservationOptions options)
{
if (task == null)
throw new ArgumentNullException("task");
if (options == null)
throw new ArgumentNullException("options");
return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe);
}
internal static IObservable<TResult> ToObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, TaskObservationOptions.Value options)
{
if (task == null)
throw new ArgumentNullException("task");
return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe);
}
private static IObservable<TResult> ToObservableImpl<[System.Runtime.CompilerServices.Nullable(2)] TResult>(Task<TResult> task, [System.Runtime.CompilerServices.Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe)
{
if (task.IsCompleted) {
if (scheduler == null)
scheduler = ImmediateScheduler.Instance;
switch (task.Status) {
case TaskStatus.Faulted:
return new Throw<TResult>(task.GetSingleException(), scheduler);
case TaskStatus.Canceled:
return new Throw<TResult>((Exception)new TaskCanceledException(task), scheduler);
default:
return new Return<TResult>(task.Result, scheduler);
}
}
return new SlowTaskObservable<TResult>(task, scheduler, ignoreExceptionsAfterUnsubscribe);
}
private static void EmitTaskResult<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, IObserver<TResult> subject)
{
switch (task.Status) {
case TaskStatus.RanToCompletion:
subject.OnNext(task.Result);
subject.OnCompleted();
break;
case TaskStatus.Faulted:
subject.OnError(task.GetSingleException());
break;
case TaskStatus.Canceled:
subject.OnError((Exception)new TaskCanceledException(task));
break;
}
}
[System.Runtime.CompilerServices.NullableContext(2)]
private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler scheduler)
{
TaskContinuationOptions taskContinuationOptions = TaskContinuationOptions.None;
if (scheduler != null)
taskContinuationOptions |= TaskContinuationOptions.ExecuteSynchronously;
return taskContinuationOptions;
}
internal static IDisposable Subscribe<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, IObserver<TResult> observer)
{
if (task.IsCompleted) {
task.EmitTaskResult(observer);
return Disposable.Empty;
}
CancellationDisposable cancellationDisposable = new CancellationDisposable();
task.ContinueWith((Action<Task<TResult>, object>)delegate(Task<TResult> t, object observerObject) {
TaskObservableExtensions.EmitTaskResult<TResult>(t, (IObserver<TResult>)observerObject);
}, (object)observer, cancellationDisposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
return cancellationDisposable;
}
public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable)
{
if (observable == null)
throw new ArgumentNullException("observable");
return observable.ToTask(default(CancellationToken), (object)null);
}
public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, IScheduler scheduler)
{
return observable.ToTask().ContinueOnScheduler(scheduler);
}
public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, [System.Runtime.CompilerServices.Nullable(2)] object state)
{
if (observable == null)
throw new ArgumentNullException("observable");
return observable.ToTask(default(CancellationToken), state);
}
public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, [System.Runtime.CompilerServices.Nullable(2)] object state, IScheduler scheduler)
{
return observable.ToTask(default(CancellationToken), state).ContinueOnScheduler(scheduler);
}
public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken)
{
if (observable == null)
throw new ArgumentNullException("observable");
return observable.ToTask(cancellationToken, (object)null);
}
public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, IScheduler scheduler)
{
return observable.ToTask(cancellationToken, (object)null).ContinueOnScheduler(scheduler);
}
internal static Task<TResult> ContinueOnScheduler<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, IScheduler scheduler)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>(task.AsyncState);
task.ContinueWith((Action<Task<TResult>, object>)delegate(Task<TResult> t, object o) {
(IScheduler, TaskCompletionSource<TResult>) obj = ((IScheduler, TaskCompletionSource<TResult>))o;
IScheduler item = obj.Item1;
TaskCompletionSource<TResult> item2 = obj.Item2;
Scheduler.ScheduleAction<(Task<TResult>, TaskCompletionSource<TResult>)>(item, (t, item2), (Action<(Task<TResult>, TaskCompletionSource<TResult>)>)delegate((Task<TResult> t, TaskCompletionSource<TResult> tcs) state) {
if (state.t.IsCanceled)
state.tcs.TrySetCanceled(new TaskCanceledException(state.t).CancellationToken);
else if (state.t.IsFaulted) {
state.tcs.TrySetException(state.t.GetSingleException());
} else {
state.tcs.TrySetResult(state.t.Result);
}
});
}, (object)(scheduler, taskCompletionSource), TaskContinuationOptions.ExecuteSynchronously);
return taskCompletionSource.Task;
}
public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, [System.Runtime.CompilerServices.Nullable(2)] object state)
{
if (observable == null)
throw new ArgumentNullException("observable");
TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>(state);
ToTaskObserver<TResult> toTaskObserver = new ToTaskObserver<TResult>(taskCompletionSource, cancellationToken);
try {
((SafeObserver<TResult>)toTaskObserver).SetResource(observable.Subscribe((IObserver<TResult>)toTaskObserver));
} catch (Exception exception) {
taskCompletionSource.TrySetException(exception);
}
return taskCompletionSource.Task;
}
public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, [System.Runtime.CompilerServices.Nullable(2)] object state, IScheduler scheduler)
{
return observable.ToTask(cancellationToken, state).ContinueOnScheduler(scheduler);
}
}
}