TaskObservableExtensions
Provides a set of static methods for converting tasks to observable sequences.
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq.ObservableImpl;
using System.Threading;
using System.Threading.Tasks;
namespace System.Reactive.Threading.Tasks
{
public static class TaskObservableExtensions
{
private sealed class SlowTaskObservable : IObservable<Unit>
{
private readonly Task _task;
private readonly IScheduler _scheduler;
public SlowTaskObservable(Task task, IScheduler scheduler)
{
_task = task;
_scheduler = scheduler;
}
public IDisposable Subscribe(IObserver<Unit> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
CancellationDisposable cancellationDisposable = new CancellationDisposable();
TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(_scheduler);
if (_scheduler == null)
_task.ContinueWith(delegate(Task t, object subjectObject) {
t.EmitTaskResult((IObserver<Unit>)subjectObject);
}, observer, cancellationDisposable.Token, taskContinuationOptions, TaskScheduler.Current);
else
_task.ContinueWithState(delegate(Task task, (SlowTaskObservable this, IObserver<Unit> observer) tuple) {
tuple.this._scheduler.ScheduleAction((task, tuple.observer), delegate((Task task, IObserver<Unit> observer) tuple2) {
tuple2.task.EmitTaskResult(tuple2.observer);
});
}, (this, observer), cancellationDisposable.Token, taskContinuationOptions);
return cancellationDisposable;
}
}
private sealed class SlowTaskObservable<TResult> : IObservable<TResult>
{
private readonly Task<TResult> _task;
private readonly IScheduler _scheduler;
public SlowTaskObservable(Task<TResult> task, IScheduler scheduler)
{
_task = task;
_scheduler = scheduler;
}
public IDisposable Subscribe(IObserver<TResult> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
CancellationDisposable cancellationDisposable = new CancellationDisposable();
TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(_scheduler);
if (_scheduler == null)
_task.ContinueWith(delegate(Task<TResult> t, object subjectObject) {
TaskObservableExtensions.EmitTaskResult<TResult>(t, (IObserver<TResult>)subjectObject);
}, observer, cancellationDisposable.Token, taskContinuationOptions, TaskScheduler.Current);
else
System.Threading.Tasks.TaskExtensions.ContinueWithState<TResult, (SlowTaskObservable<TResult>, IObserver<TResult>)>(_task, (Action<Task<TResult>, (SlowTaskObservable<TResult>, IObserver<TResult>)>)delegate(Task<TResult> task, (SlowTaskObservable<TResult> this, IObserver<TResult> observer) tuple) {
Scheduler.ScheduleAction<(Task<TResult>, IObserver<TResult>)>(tuple.this._scheduler, (task, tuple.observer), (Action<(Task<TResult>, IObserver<TResult>)>)delegate((Task<TResult> task, IObserver<TResult> observer) tuple2) {
TaskObservableExtensions.EmitTaskResult<TResult>(tuple2.task, tuple2.observer);
});
}, (this, observer), cancellationDisposable.Token, taskContinuationOptions);
return cancellationDisposable;
}
}
private sealed class ToTaskObserver<TResult> : SafeObserver<TResult>
{
private readonly CancellationToken _ct;
private readonly TaskCompletionSource<TResult> _tcs;
private readonly CancellationTokenRegistration _ctr;
private bool _hasValue;
private TResult _lastValue;
public ToTaskObserver(TaskCompletionSource<TResult> tcs, CancellationToken ct)
{
_ct = ct;
_tcs = tcs;
if (ct.CanBeCanceled)
_ctr = ct.Register(delegate(object this) {
((ToTaskObserver<TResult>)this).Cancel();
}, this);
}
public override void OnNext(TResult value)
{
_hasValue = true;
_lastValue = value;
}
public override void OnError(Exception error)
{
_tcs.TrySetException(error);
_ctr.Dispose();
Dispose();
}
public override void OnCompleted()
{
if (_hasValue)
_tcs.TrySetResult(_lastValue);
else
_tcs.TrySetException(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
_ctr.Dispose();
Dispose();
}
private void Cancel()
{
Dispose();
_tcs.TrySetCanceled(_ct);
}
}
public static IObservable<Unit> ToObservable(this Task task)
{
if (task == null)
throw new ArgumentNullException("task");
return ToObservableImpl(task, null);
}
public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler)
{
if (task == null)
throw new ArgumentNullException("task");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return ToObservableImpl(task, scheduler);
}
private static IObservable<Unit> ToObservableImpl(Task task, IScheduler scheduler)
{
if (task.IsCompleted) {
scheduler = (scheduler ?? ImmediateScheduler.Instance);
switch (task.Status) {
case TaskStatus.Faulted:
return new Throw<Unit>(task.Exception.InnerException, scheduler);
case TaskStatus.Canceled:
return new Throw<Unit>(new TaskCanceledException(task), scheduler);
default:
return new Return<Unit>(Unit.Default, scheduler);
}
}
return new SlowTaskObservable(task, scheduler);
}
private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
{
switch (task.Status) {
case TaskStatus.RanToCompletion:
subject.OnNext(Unit.Default);
subject.OnCompleted();
break;
case TaskStatus.Faulted:
subject.OnError(task.Exception.InnerException);
break;
case TaskStatus.Canceled:
subject.OnError(new TaskCanceledException(task));
break;
}
}
internal static IDisposable Subscribe(this Task task, IObserver<Unit> observer)
{
if (task.IsCompleted) {
task.EmitTaskResult(observer);
return Disposable.Empty;
}
CancellationDisposable cancellationDisposable = new CancellationDisposable();
task.ContinueWith(delegate(Task t, object observerObject) {
t.EmitTaskResult((IObserver<Unit>)observerObject);
}, observer, cancellationDisposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
return cancellationDisposable;
}
public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task)
{
if (task == null)
throw new ArgumentNullException("task");
return ToObservableImpl(task, null);
}
public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, IScheduler scheduler)
{
if (task == null)
throw new ArgumentNullException("task");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return ToObservableImpl(task, scheduler);
}
private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler scheduler)
{
if (task.IsCompleted) {
scheduler = (scheduler ?? ImmediateScheduler.Instance);
switch (task.Status) {
case TaskStatus.Faulted:
return new Throw<TResult>(task.Exception.InnerException, scheduler);
case TaskStatus.Canceled:
return new Throw<TResult>((Exception)new TaskCanceledException(task), scheduler);
default:
return new Return<TResult>(task.Result, scheduler);
}
}
return new SlowTaskObservable<TResult>(task, scheduler);
}
private static void EmitTaskResult<TResult>(this Task<TResult> task, IObserver<TResult> subject)
{
switch (task.Status) {
case TaskStatus.RanToCompletion:
subject.OnNext(task.Result);
subject.OnCompleted();
break;
case TaskStatus.Faulted:
subject.OnError(task.Exception.InnerException);
break;
case TaskStatus.Canceled:
subject.OnError((Exception)new TaskCanceledException(task));
break;
}
}
private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler scheduler)
{
TaskContinuationOptions taskContinuationOptions = TaskContinuationOptions.None;
if (scheduler != null)
taskContinuationOptions |= TaskContinuationOptions.ExecuteSynchronously;
return taskContinuationOptions;
}
internal static IDisposable Subscribe<TResult>(this Task<TResult> task, IObserver<TResult> observer)
{
if (task.IsCompleted) {
task.EmitTaskResult(observer);
return Disposable.Empty;
}
CancellationDisposable cancellationDisposable = new CancellationDisposable();
task.ContinueWith((Action<Task<TResult>, object>)delegate(Task<TResult> t, object observerObject) {
TaskObservableExtensions.EmitTaskResult<TResult>(t, (IObserver<TResult>)observerObject);
}, (object)observer, cancellationDisposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
return cancellationDisposable;
}
public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable)
{
if (observable == null)
throw new ArgumentNullException("observable");
return observable.ToTask(default(CancellationToken), null);
}
public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, object state)
{
if (observable == null)
throw new ArgumentNullException("observable");
return observable.ToTask(default(CancellationToken), state);
}
public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken)
{
if (observable == null)
throw new ArgumentNullException("observable");
return observable.ToTask(cancellationToken, null);
}
public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, object state)
{
if (observable == null)
throw new ArgumentNullException("observable");
TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>(state);
ToTaskObserver<TResult> toTaskObserver = new ToTaskObserver<TResult>(taskCompletionSource, cancellationToken);
try {
((SafeObserver<TResult>)toTaskObserver).SetResource(observable.Subscribe((IObserver<TResult>)toTaskObserver));
} catch (Exception exception) {
taskCompletionSource.TrySetException(exception);
}
return taskCompletionSource.Task;
}
}
}