<PackageReference Include="System.Reactive" Version="6.0.1" />

TaskObservableExtensions

public static class TaskObservableExtensions
Provides a set of static methods for converting tasks to observable sequences.
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq.ObservableImpl; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Threading.Tasks { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] public static class TaskObservableExtensions { [System.Runtime.CompilerServices.Nullable(0)] private sealed class SlowTaskObservable : IObservable<Unit> { private readonly Task _task; [System.Runtime.CompilerServices.Nullable(2)] private readonly IScheduler _scheduler; private readonly bool _ignoreExceptionsAfterUnsubscribe; public SlowTaskObservable(Task task, [System.Runtime.CompilerServices.Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe) { _task = task; _scheduler = scheduler; _ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe; } public IDisposable Subscribe(IObserver<Unit> observer) { if (observer == null) throw new ArgumentNullException("observer"); CancellationDisposable cancellationDisposable = new CancellationDisposable(); TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(_scheduler); if (_scheduler == null) _task.ContinueWith(delegate(Task t, object subjectObject) { t.EmitTaskResult((IObserver<Unit>)subjectObject); }, observer, cancellationDisposable.Token, taskContinuationOptions, TaskScheduler.Current); else _task.ContinueWithState(delegate(Task task, (IScheduler scheduler, IObserver<Unit> observer) tuple) { tuple.scheduler.ScheduleAction((task, tuple.observer), delegate((Task task, IObserver<Unit> observer) tuple2) { tuple2.task.EmitTaskResult(tuple2.observer); }); }, (_scheduler, observer), taskContinuationOptions, cancellationDisposable.Token); if (_ignoreExceptionsAfterUnsubscribe) _task.ContinueWith((Task t) => t.Exception, TaskContinuationOptions.OnlyOnFaulted); return cancellationDisposable; } } [System.Runtime.CompilerServices.Nullable(0)] private sealed class SlowTaskObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult> : IObservable<TResult> { private readonly Task<TResult> _task; [System.Runtime.CompilerServices.Nullable(2)] private readonly IScheduler _scheduler; private readonly bool _ignoreExceptionsAfterUnsubscribe; public SlowTaskObservable(Task<TResult> task, [System.Runtime.CompilerServices.Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe) { _task = task; _scheduler = scheduler; _ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe; } public IDisposable Subscribe(IObserver<TResult> observer) { if (observer == null) throw new ArgumentNullException("observer"); CancellationDisposable cancellationDisposable = new CancellationDisposable(); TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(_scheduler); if (_scheduler == null) _task.ContinueWith(delegate(Task<TResult> t, object subjectObject) { TaskObservableExtensions.EmitTaskResult<TResult>(t, (IObserver<TResult>)subjectObject); }, observer, cancellationDisposable.Token, taskContinuationOptions, TaskScheduler.Current); else System.Threading.Tasks.TaskExtensions.ContinueWithState<TResult, (IScheduler, IObserver<TResult>)>(_task, (Action<Task<TResult>, (IScheduler, IObserver<TResult>)>)delegate(Task<TResult> task, (IScheduler scheduler, IObserver<TResult> observer) tuple) { Scheduler.ScheduleAction<(Task<TResult>, IObserver<TResult>)>(tuple.scheduler, (task, tuple.observer), (Action<(Task<TResult>, IObserver<TResult>)>)delegate((Task<TResult> task, IObserver<TResult> observer) tuple2) { TaskObservableExtensions.EmitTaskResult<TResult>(tuple2.task, tuple2.observer); }); }, (_scheduler, observer), taskContinuationOptions, cancellationDisposable.Token); if (_ignoreExceptionsAfterUnsubscribe) _task.ContinueWith((Task<TResult> t) => t.Exception, TaskContinuationOptions.OnlyOnFaulted); return cancellationDisposable; } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] private sealed class ToTaskObserver<[System.Runtime.CompilerServices.Nullable(2)] TResult> : SafeObserver<TResult> { private readonly CancellationToken _ct; private readonly TaskCompletionSource<TResult> _tcs; private readonly CancellationTokenRegistration _ctr; private bool _hasValue; [System.Runtime.CompilerServices.Nullable(2)] private TResult _lastValue; public ToTaskObserver(TaskCompletionSource<TResult> tcs, CancellationToken ct) { _ct = ct; _tcs = tcs; if (ct.CanBeCanceled) _ctr = ct.Register(delegate(object this) { ((ToTaskObserver<TResult>)this).Cancel(); }, this); } public override void OnNext(TResult value) { _hasValue = true; _lastValue = value; } public override void OnError(Exception error) { _tcs.TrySetException(error); _ctr.Dispose(); Dispose(); } public override void OnCompleted() { if (!_hasValue) try { throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); } catch (Exception exception) { _tcs.TrySetException(exception); } else _tcs.TrySetResult(_lastValue); _ctr.Dispose(); Dispose(); } private void Cancel() { Dispose(); _tcs.TrySetCanceled(_ct); } } public static IObservable<Unit> ToObservable(this Task task) { if (task == null) throw new ArgumentNullException("task"); return ToObservableImpl(task, null, false); } public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler) { if (scheduler == null) throw new ArgumentNullException("scheduler"); return task.ToObservable(new TaskObservationOptions(scheduler, false)); } public static IObservable<Unit> ToObservable(this Task task, TaskObservationOptions options) { if (task == null) throw new ArgumentNullException("task"); if (options == null) throw new ArgumentNullException("options"); return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe); } internal static IObservable<Unit> ToObservable(this Task task, TaskObservationOptions.Value options) { if (task == null) throw new ArgumentNullException("task"); return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe); } private static IObservable<Unit> ToObservableImpl(Task task, [System.Runtime.CompilerServices.Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe) { if (task.IsCompleted) { if (scheduler == null) scheduler = ImmediateScheduler.Instance; switch (task.Status) { case TaskStatus.Faulted: return new Throw<Unit>(task.GetSingleException(), scheduler); case TaskStatus.Canceled: return new Throw<Unit>(new TaskCanceledException(task), scheduler); default: return new Return<Unit>(Unit.Default, scheduler); } } return new SlowTaskObservable(task, scheduler, ignoreExceptionsAfterUnsubscribe); } private static void EmitTaskResult(this Task task, IObserver<Unit> subject) { switch (task.Status) { case TaskStatus.RanToCompletion: subject.OnNext(Unit.Default); subject.OnCompleted(); break; case TaskStatus.Faulted: subject.OnError(task.GetSingleException()); break; case TaskStatus.Canceled: subject.OnError(new TaskCanceledException(task)); break; } } internal static IDisposable Subscribe(this Task task, IObserver<Unit> observer) { if (task.IsCompleted) { task.EmitTaskResult(observer); return Disposable.Empty; } CancellationDisposable cancellationDisposable = new CancellationDisposable(); task.ContinueWith(delegate(Task t, object observerObject) { t.EmitTaskResult((IObserver<Unit>)observerObject); }, observer, cancellationDisposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current); return cancellationDisposable; } public static IObservable<TResult> ToObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task) { if (task == null) throw new ArgumentNullException("task"); return ToObservableImpl(task, null, false); } public static IObservable<TResult> ToObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, IScheduler scheduler) { if (scheduler == null) throw new ArgumentNullException("scheduler"); return task.ToObservable(new TaskObservationOptions(scheduler, false)); } public static IObservable<TResult> ToObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, TaskObservationOptions options) { if (task == null) throw new ArgumentNullException("task"); if (options == null) throw new ArgumentNullException("options"); return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe); } internal static IObservable<TResult> ToObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, TaskObservationOptions.Value options) { if (task == null) throw new ArgumentNullException("task"); return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe); } private static IObservable<TResult> ToObservableImpl<[System.Runtime.CompilerServices.Nullable(2)] TResult>(Task<TResult> task, [System.Runtime.CompilerServices.Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe) { if (task.IsCompleted) { if (scheduler == null) scheduler = ImmediateScheduler.Instance; switch (task.Status) { case TaskStatus.Faulted: return new Throw<TResult>(task.GetSingleException(), scheduler); case TaskStatus.Canceled: return new Throw<TResult>((Exception)new TaskCanceledException(task), scheduler); default: return new Return<TResult>(task.Result, scheduler); } } return new SlowTaskObservable<TResult>(task, scheduler, ignoreExceptionsAfterUnsubscribe); } private static void EmitTaskResult<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, IObserver<TResult> subject) { switch (task.Status) { case TaskStatus.RanToCompletion: subject.OnNext(task.Result); subject.OnCompleted(); break; case TaskStatus.Faulted: subject.OnError(task.GetSingleException()); break; case TaskStatus.Canceled: subject.OnError((Exception)new TaskCanceledException(task)); break; } } [System.Runtime.CompilerServices.NullableContext(2)] private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler scheduler) { TaskContinuationOptions taskContinuationOptions = TaskContinuationOptions.None; if (scheduler != null) taskContinuationOptions |= TaskContinuationOptions.ExecuteSynchronously; return taskContinuationOptions; } internal static IDisposable Subscribe<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, IObserver<TResult> observer) { if (task.IsCompleted) { task.EmitTaskResult(observer); return Disposable.Empty; } CancellationDisposable cancellationDisposable = new CancellationDisposable(); task.ContinueWith((Action<Task<TResult>, object>)delegate(Task<TResult> t, object observerObject) { TaskObservableExtensions.EmitTaskResult<TResult>(t, (IObserver<TResult>)observerObject); }, (object)observer, cancellationDisposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current); return cancellationDisposable; } public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable) { if (observable == null) throw new ArgumentNullException("observable"); return observable.ToTask(default(CancellationToken), (object)null); } public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, IScheduler scheduler) { return observable.ToTask().ContinueOnScheduler(scheduler); } public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, [System.Runtime.CompilerServices.Nullable(2)] object state) { if (observable == null) throw new ArgumentNullException("observable"); return observable.ToTask(default(CancellationToken), state); } public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, [System.Runtime.CompilerServices.Nullable(2)] object state, IScheduler scheduler) { return observable.ToTask(default(CancellationToken), state).ContinueOnScheduler(scheduler); } public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken) { if (observable == null) throw new ArgumentNullException("observable"); return observable.ToTask(cancellationToken, (object)null); } public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, IScheduler scheduler) { return observable.ToTask(cancellationToken, (object)null).ContinueOnScheduler(scheduler); } internal static Task<TResult> ContinueOnScheduler<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, IScheduler scheduler) { if (scheduler == null) throw new ArgumentNullException("scheduler"); TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>(task.AsyncState); task.ContinueWith((Action<Task<TResult>, object>)delegate(Task<TResult> t, object o) { (IScheduler, TaskCompletionSource<TResult>) obj = ((IScheduler, TaskCompletionSource<TResult>))o; IScheduler item = obj.Item1; TaskCompletionSource<TResult> item2 = obj.Item2; Scheduler.ScheduleAction<(Task<TResult>, TaskCompletionSource<TResult>)>(item, (t, item2), (Action<(Task<TResult>, TaskCompletionSource<TResult>)>)delegate((Task<TResult> t, TaskCompletionSource<TResult> tcs) state) { if (state.t.IsCanceled) state.tcs.TrySetCanceled(new TaskCanceledException(state.t).CancellationToken); else if (state.t.IsFaulted) { state.tcs.TrySetException(state.t.GetSingleException()); } else { state.tcs.TrySetResult(state.t.Result); } }); }, (object)(scheduler, taskCompletionSource), TaskContinuationOptions.ExecuteSynchronously); return taskCompletionSource.Task; } public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, [System.Runtime.CompilerServices.Nullable(2)] object state) { if (observable == null) throw new ArgumentNullException("observable"); TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>(state); ToTaskObserver<TResult> toTaskObserver = new ToTaskObserver<TResult>(taskCompletionSource, cancellationToken); try { ((SafeObserver<TResult>)toTaskObserver).SetResource(observable.Subscribe((IObserver<TResult>)toTaskObserver)); } catch (Exception exception) { taskCompletionSource.TrySetException(exception); } return taskCompletionSource.Task; } public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, [System.Runtime.CompilerServices.Nullable(2)] object state, IScheduler scheduler) { return observable.ToTask(cancellationToken, state).ContinueOnScheduler(scheduler); } } }