TaskObservableExtensions
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Linq.ObservableImpl;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
namespace System.Reactive.Threading.Tasks
{
public static class TaskObservableExtensions
{
private sealed class ToTaskObserver<TResult> : IObserver<TResult>
{
private readonly CancellationToken _ct;
private readonly IDisposable _disposable;
private readonly TaskCompletionSource<TResult> _tcs;
private readonly CancellationTokenRegistration _ctr;
private bool _hasValue;
private TResult _lastValue;
public ToTaskObserver(TaskCompletionSource<TResult> tcs, IDisposable disposable, CancellationToken ct)
{
_ct = ct;
_tcs = tcs;
_disposable = disposable;
if (ct.CanBeCanceled)
_ctr = ct.Register(Cancel);
}
public void OnNext(TResult value)
{
_hasValue = true;
_lastValue = value;
}
public void OnError(Exception error)
{
_tcs.TrySetException(error);
_ctr.Dispose();
_disposable.Dispose();
}
public void OnCompleted()
{
if (_hasValue)
_tcs.TrySetResult(_lastValue);
else
_tcs.TrySetException(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
_ctr.Dispose();
_disposable.Dispose();
}
private void Cancel()
{
_disposable.Dispose();
_tcs.TrySetCanceled(_ct);
}
}
public static IObservable<Unit> ToObservable(this Task task)
{
if (task == null)
throw new ArgumentNullException("task");
return ToObservableImpl(task, null);
}
public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler)
{
if (task == null)
throw new ArgumentNullException("task");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return ToObservableImpl(task, scheduler);
}
private static IObservable<Unit> ToObservableImpl(Task task, IScheduler scheduler)
{
IObservable<Unit> result = null;
if (task.IsCompleted) {
scheduler = (scheduler ?? ImmediateScheduler.Instance);
switch (task.Status) {
case TaskStatus.RanToCompletion:
result = new Return<Unit>(Unit.Default, scheduler);
break;
case TaskStatus.Faulted:
result = new Throw<Unit>(task.Exception.InnerException, scheduler);
break;
case TaskStatus.Canceled:
result = new Throw<Unit>(new TaskCanceledException(task), scheduler);
break;
}
} else
result = ToObservableSlow(task, scheduler);
return result;
}
private static IObservable<Unit> ToObservableSlow(Task task, IScheduler scheduler)
{
AsyncSubject<Unit> subject = new AsyncSubject<Unit>();
TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(scheduler);
task.ContinueWith(delegate {
ToObservableDone(task, subject);
}, taskContinuationOptions);
return ToObservableResult(subject, scheduler);
}
private static void ToObservableDone(Task task, IObserver<Unit> subject)
{
switch (task.Status) {
case TaskStatus.RanToCompletion:
subject.OnNext(Unit.Default);
subject.OnCompleted();
break;
case TaskStatus.Faulted:
subject.OnError(task.Exception.InnerException);
break;
case TaskStatus.Canceled:
subject.OnError(new TaskCanceledException(task));
break;
}
}
public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task)
{
if (task == null)
throw new ArgumentNullException("task");
return ToObservableImpl(task, null);
}
public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, IScheduler scheduler)
{
if (task == null)
throw new ArgumentNullException("task");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return ToObservableImpl(task, scheduler);
}
private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler scheduler)
{
IObservable<TResult> result = null;
if (task.IsCompleted) {
scheduler = (scheduler ?? ImmediateScheduler.Instance);
switch (task.Status) {
case TaskStatus.RanToCompletion:
result = new Return<TResult>(task.Result, scheduler);
break;
case TaskStatus.Faulted:
result = new Throw<TResult>(task.Exception.InnerException, scheduler);
break;
case TaskStatus.Canceled:
result = new Throw<TResult>((Exception)new TaskCanceledException(task), scheduler);
break;
}
} else
result = ToObservableSlow(task, scheduler);
return result;
}
private static IObservable<TResult> ToObservableSlow<TResult>(Task<TResult> task, IScheduler scheduler)
{
AsyncSubject<TResult> subject = (AsyncSubject<TResult>)new AsyncSubject<TResult>();
TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(scheduler);
task.ContinueWith((Action<Task<TResult>>)delegate {
TaskObservableExtensions.ToObservableDone<TResult>(task, (IObserver<TResult>)subject);
}, taskContinuationOptions);
return ToObservableResult((AsyncSubject<TResult>)subject, scheduler);
}
private static void ToObservableDone<TResult>(Task<TResult> task, IObserver<TResult> subject)
{
switch (task.Status) {
case TaskStatus.RanToCompletion:
subject.OnNext(task.Result);
subject.OnCompleted();
break;
case TaskStatus.Faulted:
subject.OnError(task.Exception.InnerException);
break;
case TaskStatus.Canceled:
subject.OnError((Exception)new TaskCanceledException(task));
break;
}
}
private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler scheduler)
{
TaskContinuationOptions taskContinuationOptions = TaskContinuationOptions.None;
if (scheduler != null)
taskContinuationOptions |= TaskContinuationOptions.ExecuteSynchronously;
return taskContinuationOptions;
}
private static IObservable<TResult> ToObservableResult<TResult>(AsyncSubject<TResult> subject, IScheduler scheduler)
{
if (scheduler != null)
return subject.ObserveOn(scheduler);
return subject.AsObservable();
}
public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable)
{
if (observable == null)
throw new ArgumentNullException("observable");
return observable.ToTask(default(CancellationToken), null);
}
public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, object state)
{
if (observable == null)
throw new ArgumentNullException("observable");
return observable.ToTask(default(CancellationToken), state);
}
public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken)
{
if (observable == null)
throw new ArgumentNullException("observable");
return observable.ToTask(cancellationToken, null);
}
public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, object state)
{
if (observable == null)
throw new ArgumentNullException("observable");
TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>(state);
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
ToTaskObserver<TResult> observer = new ToTaskObserver<TResult>(taskCompletionSource, (IDisposable)singleAssignmentDisposable, cancellationToken);
try {
singleAssignmentDisposable.Disposable = observable.Subscribe((IObserver<TResult>)observer);
} catch (Exception exception) {
taskCompletionSource.TrySetException(exception);
}
return taskCompletionSource.Task;
}
}
}