<PackageReference Include="System.Reactive" Version="7.0.0-preview.1" />

TaskObservableExtensions

public static class TaskObservableExtensions
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq.ObservableImpl; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Threading.Tasks { [NullableContext(1)] [Nullable(0)] public static class TaskObservableExtensions { [Nullable(0)] private sealed class SlowTaskObservable : IObservable<Unit> { private readonly Task _task; [Nullable(2)] private readonly IScheduler _scheduler; private readonly bool _ignoreExceptionsAfterUnsubscribe; public SlowTaskObservable(Task task, [Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe) { _task = task; _scheduler = scheduler; _ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe; } public IDisposable Subscribe(IObserver<Unit> observer) { if (observer == null) throw new ArgumentNullException("observer"); CancellationDisposable cancellationDisposable = new CancellationDisposable(); TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(_scheduler); if (_scheduler == null) _task.ContinueWith(delegate(Task t, object subjectObject) { t.EmitTaskResult((IObserver<Unit>)subjectObject); }, observer, cancellationDisposable.Token, taskContinuationOptions, TaskScheduler.Current); else System.Threading.Tasks.TaskExtensions.ContinueWithState(_task, delegate(Task task, (IScheduler scheduler, IObserver<Unit> observer) tuple) { tuple.scheduler.ScheduleAction((task, tuple.observer), delegate((Task task, IObserver<Unit> observer) tuple2) { tuple2.task.EmitTaskResult(tuple2.observer); }); }, (_scheduler, observer), taskContinuationOptions, cancellationDisposable.Token); if (_ignoreExceptionsAfterUnsubscribe) _task.ContinueWith((Task t) => t.Exception, TaskContinuationOptions.OnlyOnFaulted); return cancellationDisposable; } } [Nullable(0)] private sealed class SlowTaskObservable<[Nullable(2)] TResult> : IObservable<TResult> { private readonly Task<TResult> _task; [Nullable(2)] private readonly IScheduler _scheduler; private readonly bool _ignoreExceptionsAfterUnsubscribe; public SlowTaskObservable(Task<TResult> task, [Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe) { _task = task; _scheduler = scheduler; _ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe; } public IDisposable Subscribe(IObserver<TResult> observer) { if (observer == null) throw new ArgumentNullException("observer"); CancellationDisposable cancellationDisposable = new CancellationDisposable(); TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(_scheduler); if (_scheduler == null) _task.ContinueWith(delegate(Task<TResult> t, object subjectObject) { TaskObservableExtensions.EmitTaskResult<TResult>(t, (IObserver<TResult>)subjectObject); }, observer, cancellationDisposable.Token, taskContinuationOptions, TaskScheduler.Current); else System.Threading.Tasks.TaskExtensions.ContinueWithState<TResult, (IScheduler, IObserver<TResult>)>(_task, (Action<Task<TResult>, (IScheduler, IObserver<TResult>)>)delegate(Task<TResult> task, (IScheduler scheduler, IObserver<TResult> observer) tuple) { Scheduler.ScheduleAction<(Task<TResult>, IObserver<TResult>)>(tuple.scheduler, (task, tuple.observer), (Action<(Task<TResult>, IObserver<TResult>)>)delegate((Task<TResult> task, IObserver<TResult> observer) tuple2) { TaskObservableExtensions.EmitTaskResult<TResult>(tuple2.task, tuple2.observer); }); }, (_scheduler, observer), taskContinuationOptions, cancellationDisposable.Token); if (_ignoreExceptionsAfterUnsubscribe) _task.ContinueWith((Task<TResult> t) => t.Exception, TaskContinuationOptions.OnlyOnFaulted); return cancellationDisposable; } } [Nullable(new byte[] { 0, 1 })] private sealed class ToTaskObserver<[Nullable(2)] TResult> : System.Reactive.SafeObserver<TResult> { private readonly CancellationToken _ct; private readonly TaskCompletionSource<TResult> _tcs; private readonly CancellationTokenRegistration _ctr; private bool _hasValue; [Nullable(2)] private TResult _lastValue; public ToTaskObserver(TaskCompletionSource<TResult> tcs, CancellationToken ct) { _ct = ct; _tcs = tcs; if (ct.CanBeCanceled) _ctr = ct.Register(delegate(object this) { ((ToTaskObserver<TResult>)this).Cancel(); }, this); } public override void OnNext(TResult value) { _hasValue = true; _lastValue = value; } public override void OnError(Exception error) { _tcs.TrySetException(error); _ctr.Dispose(); Dispose(); } public override void OnCompleted() { if (!_hasValue) try { throw new InvalidOperationException(System.Reactive.Strings_Linq.NO_ELEMENTS); } catch (Exception exception) { _tcs.TrySetException(exception); } else _tcs.TrySetResult(_lastValue); _ctr.Dispose(); Dispose(); } private void Cancel() { Dispose(); _tcs.TrySetCanceled(_ct); } } public static IObservable<Unit> ToObservable(this Task task) { if (task == null) throw new ArgumentNullException("task"); return ToObservableImpl(task, null, false); } public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler) { if (scheduler == null) throw new ArgumentNullException("scheduler"); return task.ToObservable(new TaskObservationOptions(scheduler, false)); } public static IObservable<Unit> ToObservable(this Task task, TaskObservationOptions options) { if (task == null) throw new ArgumentNullException("task"); if (options == null) throw new ArgumentNullException("options"); return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe); } internal static IObservable<Unit> ToObservable(this Task task, TaskObservationOptions.Value options) { if (task == null) throw new ArgumentNullException("task"); return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe); } private static IObservable<Unit> ToObservableImpl(Task task, [Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe) { if (task.IsCompleted) { if (scheduler == null) scheduler = ImmediateScheduler.Instance; switch (task.Status) { case TaskStatus.Faulted: return new System.Reactive.Linq.ObservableImpl.Throw<Unit>(System.Reactive.Concurrency.TaskHelpers.GetSingleException(task), scheduler); case TaskStatus.Canceled: return new System.Reactive.Linq.ObservableImpl.Throw<Unit>(new TaskCanceledException(task), scheduler); default: return new System.Reactive.Linq.ObservableImpl.Return<Unit>(Unit.Default, scheduler); } } return new SlowTaskObservable(task, scheduler, ignoreExceptionsAfterUnsubscribe); } private static void EmitTaskResult(this Task task, IObserver<Unit> subject) { switch (task.Status) { case TaskStatus.RanToCompletion: subject.OnNext(Unit.Default); subject.OnCompleted(); break; case TaskStatus.Faulted: subject.OnError(System.Reactive.Concurrency.TaskHelpers.GetSingleException(task)); break; case TaskStatus.Canceled: subject.OnError(new TaskCanceledException(task)); break; } } internal static IDisposable Subscribe(this Task task, IObserver<Unit> observer) { if (task.IsCompleted) { task.EmitTaskResult(observer); return Disposable.Empty; } CancellationDisposable cancellationDisposable = new CancellationDisposable(); task.ContinueWith(delegate(Task t, object observerObject) { t.EmitTaskResult((IObserver<Unit>)observerObject); }, observer, cancellationDisposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current); return cancellationDisposable; } public static IObservable<TResult> ToObservable<[Nullable(2)] TResult>(this Task<TResult> task) { if (task == null) throw new ArgumentNullException("task"); return ToObservableImpl(task, null, false); } public static IObservable<TResult> ToObservable<[Nullable(2)] TResult>(this Task<TResult> task, IScheduler scheduler) { if (scheduler == null) throw new ArgumentNullException("scheduler"); return task.ToObservable(new TaskObservationOptions(scheduler, false)); } public static IObservable<TResult> ToObservable<[Nullable(2)] TResult>(this Task<TResult> task, TaskObservationOptions options) { if (task == null) throw new ArgumentNullException("task"); if (options == null) throw new ArgumentNullException("options"); return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe); } internal static IObservable<TResult> ToObservable<[Nullable(2)] TResult>(this Task<TResult> task, TaskObservationOptions.Value options) { if (task == null) throw new ArgumentNullException("task"); return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe); } private static IObservable<TResult> ToObservableImpl<[Nullable(2)] TResult>(Task<TResult> task, [Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe) { if (task.IsCompleted) { if (scheduler == null) scheduler = ImmediateScheduler.Instance; switch (task.Status) { case TaskStatus.Faulted: return new System.Reactive.Linq.ObservableImpl.Throw<TResult>(System.Reactive.Concurrency.TaskHelpers.GetSingleException(task), scheduler); case TaskStatus.Canceled: return new System.Reactive.Linq.ObservableImpl.Throw<TResult>((Exception)new TaskCanceledException(task), scheduler); default: return new System.Reactive.Linq.ObservableImpl.Return<TResult>(task.Result, scheduler); } } return new SlowTaskObservable<TResult>(task, scheduler, ignoreExceptionsAfterUnsubscribe); } private static void EmitTaskResult<[Nullable(2)] TResult>(this Task<TResult> task, IObserver<TResult> subject) { switch (task.Status) { case TaskStatus.RanToCompletion: subject.OnNext(task.Result); subject.OnCompleted(); break; case TaskStatus.Faulted: subject.OnError(System.Reactive.Concurrency.TaskHelpers.GetSingleException(task)); break; case TaskStatus.Canceled: subject.OnError((Exception)new TaskCanceledException(task)); break; } } [NullableContext(2)] private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler scheduler) { TaskContinuationOptions taskContinuationOptions = TaskContinuationOptions.None; if (scheduler != null) taskContinuationOptions |= TaskContinuationOptions.ExecuteSynchronously; return taskContinuationOptions; } internal static IDisposable Subscribe<[Nullable(2)] TResult>(this Task<TResult> task, IObserver<TResult> observer) { if (task.IsCompleted) { task.EmitTaskResult(observer); return Disposable.Empty; } CancellationDisposable cancellationDisposable = new CancellationDisposable(); task.ContinueWith((Action<Task<TResult>, object>)delegate(Task<TResult> t, object observerObject) { TaskObservableExtensions.EmitTaskResult<TResult>(t, (IObserver<TResult>)observerObject); }, (object)observer, cancellationDisposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current); return cancellationDisposable; } public static Task<TResult> ToTask<[Nullable(2)] TResult>(this IObservable<TResult> observable) { if (observable == null) throw new ArgumentNullException("observable"); return ToTask(observable, default(CancellationToken), (object)null); } public static Task<TResult> ToTask<[Nullable(2)] TResult>(this IObservable<TResult> observable, IScheduler scheduler) { return ToTask(observable).ContinueOnScheduler(scheduler); } public static Task<TResult> ToTask<[Nullable(2)] TResult>(this IObservable<TResult> observable, [Nullable(2)] object state) { if (observable == null) throw new ArgumentNullException("observable"); return ToTask(observable, default(CancellationToken), state); } public static Task<TResult> ToTask<[Nullable(2)] TResult>(this IObservable<TResult> observable, [Nullable(2)] object state, IScheduler scheduler) { return ToTask(observable, default(CancellationToken), state).ContinueOnScheduler(scheduler); } public static Task<TResult> ToTask<[Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken) { if (observable == null) throw new ArgumentNullException("observable"); return ToTask(observable, cancellationToken, (object)null); } public static Task<TResult> ToTask<[Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, IScheduler scheduler) { return ToTask(observable, cancellationToken, (object)null).ContinueOnScheduler(scheduler); } internal static Task<TResult> ContinueOnScheduler<[Nullable(2)] TResult>(this Task<TResult> task, IScheduler scheduler) { if (scheduler == null) throw new ArgumentNullException("scheduler"); TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>(task.AsyncState); task.ContinueWith((Action<Task<TResult>, object>)delegate(Task<TResult> t, object o) { (IScheduler, TaskCompletionSource<TResult>) obj = ((IScheduler, TaskCompletionSource<TResult>))o; IScheduler item = obj.Item1; TaskCompletionSource<TResult> item2 = obj.Item2; Scheduler.ScheduleAction<(Task<TResult>, TaskCompletionSource<TResult>)>(item, (t, item2), (Action<(Task<TResult>, TaskCompletionSource<TResult>)>)delegate((Task<TResult> t, TaskCompletionSource<TResult> tcs) state) { if (state.t.IsCanceled) state.tcs.TrySetCanceled(new TaskCanceledException(state.t).CancellationToken); else if (state.t.IsFaulted) { state.tcs.TrySetException(System.Reactive.Concurrency.TaskHelpers.GetSingleException(state.t)); } else { state.tcs.TrySetResult(state.t.Result); } }); }, (object)(scheduler, taskCompletionSource), TaskContinuationOptions.ExecuteSynchronously); return taskCompletionSource.Task; } public static Task<TResult> ToTask<[Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, [Nullable(2)] object state) { if (observable == null) throw new ArgumentNullException("observable"); TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>(state); ToTaskObserver<TResult> toTaskObserver = new ToTaskObserver<TResult>(taskCompletionSource, cancellationToken); try { ((System.Reactive.SafeObserver<TResult>)toTaskObserver).SetResource(observable.Subscribe((IObserver<TResult>)toTaskObserver)); } catch (Exception exception) { taskCompletionSource.TrySetException(exception); } return taskCompletionSource.Task; } public static Task<TResult> ToTask<[Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, [Nullable(2)] object state, IScheduler scheduler) { return ToTask(observable, cancellationToken, state).ContinueOnScheduler(scheduler); } } }