<PackageReference Include="System.Reactive" Version="4.1.0-preview.330" />

TaskObservableExtensions

public static class TaskObservableExtensions
Provides a set of static methods for converting tasks to observable sequences.
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq.ObservableImpl; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Threading.Tasks { public static class TaskObservableExtensions { private sealed class SlowTaskObservable : IObservable<Unit> { private readonly Task _task; private readonly IScheduler _scheduler; public SlowTaskObservable(Task task, IScheduler scheduler) { _task = task; _scheduler = scheduler; } public IDisposable Subscribe(IObserver<Unit> observer) { if (observer == null) throw new ArgumentNullException("observer"); CancellationDisposable cancellationDisposable = new CancellationDisposable(); TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(_scheduler); if (_scheduler == null) _task.ContinueWith(delegate(Task t, object subjectObject) { t.EmitTaskResult((IObserver<Unit>)subjectObject); }, observer, cancellationDisposable.Token, taskContinuationOptions, TaskScheduler.Current); else _task.ContinueWithState(delegate(Task task, (SlowTaskObservable this, IObserver<Unit> observer) tuple) { tuple.this._scheduler.ScheduleAction((task, tuple.observer), delegate((Task task, IObserver<Unit> observer) tuple2) { tuple2.task.EmitTaskResult(tuple2.observer); }); }, (this, observer), cancellationDisposable.Token, taskContinuationOptions); return cancellationDisposable; } } private sealed class SlowTaskObservable<TResult> : IObservable<TResult> { private readonly Task<TResult> _task; private readonly IScheduler _scheduler; public SlowTaskObservable(Task<TResult> task, IScheduler scheduler) { _task = task; _scheduler = scheduler; } public IDisposable Subscribe(IObserver<TResult> observer) { if (observer == null) throw new ArgumentNullException("observer"); CancellationDisposable cancellationDisposable = new CancellationDisposable(); TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(_scheduler); if (_scheduler == null) _task.ContinueWith(delegate(Task<TResult> t, object subjectObject) { TaskObservableExtensions.EmitTaskResult<TResult>(t, (IObserver<TResult>)subjectObject); }, observer, cancellationDisposable.Token, taskContinuationOptions, TaskScheduler.Current); else System.Threading.Tasks.TaskExtensions.ContinueWithState<TResult, (SlowTaskObservable<TResult>, IObserver<TResult>)>(_task, (Action<Task<TResult>, (SlowTaskObservable<TResult>, IObserver<TResult>)>)delegate(Task<TResult> task, (SlowTaskObservable<TResult> this, IObserver<TResult> observer) tuple) { Scheduler.ScheduleAction<(Task<TResult>, IObserver<TResult>)>(tuple.this._scheduler, (task, tuple.observer), (Action<(Task<TResult>, IObserver<TResult>)>)delegate((Task<TResult> task, IObserver<TResult> observer) tuple2) { TaskObservableExtensions.EmitTaskResult<TResult>(tuple2.task, tuple2.observer); }); }, (this, observer), cancellationDisposable.Token, taskContinuationOptions); return cancellationDisposable; } } private sealed class ToTaskObserver<TResult> : SafeObserver<TResult> { private readonly CancellationToken _ct; private readonly TaskCompletionSource<TResult> _tcs; private readonly CancellationTokenRegistration _ctr; private bool _hasValue; private TResult _lastValue; public ToTaskObserver(TaskCompletionSource<TResult> tcs, CancellationToken ct) { _ct = ct; _tcs = tcs; if (ct.CanBeCanceled) _ctr = ct.Register(delegate(object this) { ((ToTaskObserver<TResult>)this).Cancel(); }, this); } public override void OnNext(TResult value) { _hasValue = true; _lastValue = value; } public override void OnError(Exception error) { _tcs.TrySetException(error); _ctr.Dispose(); Dispose(); } public override void OnCompleted() { if (_hasValue) _tcs.TrySetResult(_lastValue); else _tcs.TrySetException(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); _ctr.Dispose(); Dispose(); } private void Cancel() { Dispose(); _tcs.TrySetCanceled(_ct); } } public static IObservable<Unit> ToObservable(this Task task) { if (task == null) throw new ArgumentNullException("task"); return ToObservableImpl(task, null); } public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler) { if (task == null) throw new ArgumentNullException("task"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return ToObservableImpl(task, scheduler); } private static IObservable<Unit> ToObservableImpl(Task task, IScheduler scheduler) { if (task.IsCompleted) { scheduler = (scheduler ?? ImmediateScheduler.Instance); switch (task.Status) { case TaskStatus.Faulted: return new Throw<Unit>(task.Exception.InnerException, scheduler); case TaskStatus.Canceled: return new Throw<Unit>(new TaskCanceledException(task), scheduler); default: return new Return<Unit>(Unit.Default, scheduler); } } return new SlowTaskObservable(task, scheduler); } private static void EmitTaskResult(this Task task, IObserver<Unit> subject) { switch (task.Status) { case TaskStatus.RanToCompletion: subject.OnNext(Unit.Default); subject.OnCompleted(); break; case TaskStatus.Faulted: subject.OnError(task.Exception.InnerException); break; case TaskStatus.Canceled: subject.OnError(new TaskCanceledException(task)); break; } } internal static IDisposable Subscribe(this Task task, IObserver<Unit> observer) { if (task.IsCompleted) { task.EmitTaskResult(observer); return Disposable.Empty; } CancellationDisposable cancellationDisposable = new CancellationDisposable(); task.ContinueWith(delegate(Task t, object observerObject) { t.EmitTaskResult((IObserver<Unit>)observerObject); }, observer, cancellationDisposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current); return cancellationDisposable; } public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task) { if (task == null) throw new ArgumentNullException("task"); return ToObservableImpl(task, null); } public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, IScheduler scheduler) { if (task == null) throw new ArgumentNullException("task"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return ToObservableImpl(task, scheduler); } private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler scheduler) { if (task.IsCompleted) { scheduler = (scheduler ?? ImmediateScheduler.Instance); switch (task.Status) { case TaskStatus.Faulted: return new Throw<TResult>(task.Exception.InnerException, scheduler); case TaskStatus.Canceled: return new Throw<TResult>((Exception)new TaskCanceledException(task), scheduler); default: return new Return<TResult>(task.Result, scheduler); } } return new SlowTaskObservable<TResult>(task, scheduler); } private static void EmitTaskResult<TResult>(this Task<TResult> task, IObserver<TResult> subject) { switch (task.Status) { case TaskStatus.RanToCompletion: subject.OnNext(task.Result); subject.OnCompleted(); break; case TaskStatus.Faulted: subject.OnError(task.Exception.InnerException); break; case TaskStatus.Canceled: subject.OnError((Exception)new TaskCanceledException(task)); break; } } private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler scheduler) { TaskContinuationOptions taskContinuationOptions = TaskContinuationOptions.None; if (scheduler != null) taskContinuationOptions |= TaskContinuationOptions.ExecuteSynchronously; return taskContinuationOptions; } internal static IDisposable Subscribe<TResult>(this Task<TResult> task, IObserver<TResult> observer) { if (task.IsCompleted) { task.EmitTaskResult(observer); return Disposable.Empty; } CancellationDisposable cancellationDisposable = new CancellationDisposable(); task.ContinueWith((Action<Task<TResult>, object>)delegate(Task<TResult> t, object observerObject) { TaskObservableExtensions.EmitTaskResult<TResult>(t, (IObserver<TResult>)observerObject); }, (object)observer, cancellationDisposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current); return cancellationDisposable; } public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable) { if (observable == null) throw new ArgumentNullException("observable"); return observable.ToTask(default(CancellationToken), null); } public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, object state) { if (observable == null) throw new ArgumentNullException("observable"); return observable.ToTask(default(CancellationToken), state); } public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken) { if (observable == null) throw new ArgumentNullException("observable"); return observable.ToTask(cancellationToken, null); } public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, object state) { if (observable == null) throw new ArgumentNullException("observable"); TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>(state); ToTaskObserver<TResult> toTaskObserver = new ToTaskObserver<TResult>(taskCompletionSource, cancellationToken); try { ((SafeObserver<TResult>)toTaskObserver).SetResource(observable.Subscribe((IObserver<TResult>)toTaskObserver)); } catch (Exception exception) { taskCompletionSource.TrySetException(exception); } return taskCompletionSource.Task; } } }