TaskObservableExtensions
Provides a set of static methods for converting tasks to observable sequences.
            
                using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq.ObservableImpl;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace System.Reactive.Threading.Tasks
{
    [System.Runtime.CompilerServices.NullableContext(1)]
    [System.Runtime.CompilerServices.Nullable(0)]
    public static class TaskObservableExtensions
    {
        [System.Runtime.CompilerServices.Nullable(0)]
        private sealed class SlowTaskObservable : IObservable<Unit>
        {
            private readonly Task _task;
            [System.Runtime.CompilerServices.Nullable(2)]
            private readonly IScheduler _scheduler;
            private readonly bool _ignoreExceptionsAfterUnsubscribe;
            public SlowTaskObservable(Task task, [System.Runtime.CompilerServices.Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe)
            {
                _task = task;
                _scheduler = scheduler;
                _ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe;
            }
            public IDisposable Subscribe(IObserver<Unit> observer)
            {
                if (observer == null)
                    throw new ArgumentNullException("observer");
                CancellationDisposable cancellationDisposable = new CancellationDisposable();
                TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(_scheduler);
                if (_scheduler == null)
                    _task.ContinueWith(delegate(Task t, object subjectObject) {
                        t.EmitTaskResult((IObserver<Unit>)subjectObject);
                    }, observer, cancellationDisposable.Token, taskContinuationOptions, TaskScheduler.Current);
                else
                    _task.ContinueWithState(delegate(Task task, (IScheduler scheduler, IObserver<Unit> observer) tuple) {
                        tuple.scheduler.ScheduleAction((task, tuple.observer), delegate((Task task, IObserver<Unit> observer) tuple2) {
                            tuple2.task.EmitTaskResult(tuple2.observer);
                        });
                    }, (_scheduler, observer), taskContinuationOptions, cancellationDisposable.Token);
                if (_ignoreExceptionsAfterUnsubscribe)
                    _task.ContinueWith((Task t) => t.Exception, TaskContinuationOptions.OnlyOnFaulted);
                return cancellationDisposable;
            }
        }
        [System.Runtime.CompilerServices.Nullable(0)]
        private sealed class SlowTaskObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult> : IObservable<TResult>
        {
            private readonly Task<TResult> _task;
            [System.Runtime.CompilerServices.Nullable(2)]
            private readonly IScheduler _scheduler;
            private readonly bool _ignoreExceptionsAfterUnsubscribe;
            public SlowTaskObservable(Task<TResult> task, [System.Runtime.CompilerServices.Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe)
            {
                _task = task;
                _scheduler = scheduler;
                _ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe;
            }
            public IDisposable Subscribe(IObserver<TResult> observer)
            {
                if (observer == null)
                    throw new ArgumentNullException("observer");
                CancellationDisposable cancellationDisposable = new CancellationDisposable();
                TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(_scheduler);
                if (_scheduler == null)
                    _task.ContinueWith(delegate(Task<TResult> t, object subjectObject) {
                        TaskObservableExtensions.EmitTaskResult<TResult>(t, (IObserver<TResult>)subjectObject);
                    }, observer, cancellationDisposable.Token, taskContinuationOptions, TaskScheduler.Current);
                else
                    System.Threading.Tasks.TaskExtensions.ContinueWithState<TResult, (IScheduler, IObserver<TResult>)>(_task, (Action<Task<TResult>, (IScheduler, IObserver<TResult>)>)delegate(Task<TResult> task, (IScheduler scheduler, IObserver<TResult> observer) tuple) {
                        Scheduler.ScheduleAction<(Task<TResult>, IObserver<TResult>)>(tuple.scheduler, (task, tuple.observer), (Action<(Task<TResult>, IObserver<TResult>)>)delegate((Task<TResult> task, IObserver<TResult> observer) tuple2) {
                            TaskObservableExtensions.EmitTaskResult<TResult>(tuple2.task, tuple2.observer);
                        });
                    }, (_scheduler, observer), taskContinuationOptions, cancellationDisposable.Token);
                if (_ignoreExceptionsAfterUnsubscribe)
                    _task.ContinueWith((Task<TResult> t) => t.Exception, TaskContinuationOptions.OnlyOnFaulted);
                return cancellationDisposable;
            }
        }
        [System.Runtime.CompilerServices.Nullable(new byte[] {
            0,
            1
        })]
        private sealed class ToTaskObserver<[System.Runtime.CompilerServices.Nullable(2)] TResult> : SafeObserver<TResult>
        {
            private readonly CancellationToken _ct;
            private readonly TaskCompletionSource<TResult> _tcs;
            private readonly CancellationTokenRegistration _ctr;
            private bool _hasValue;
            [System.Runtime.CompilerServices.Nullable(2)]
            private TResult _lastValue;
            public ToTaskObserver(TaskCompletionSource<TResult> tcs, CancellationToken ct)
            {
                _ct = ct;
                _tcs = tcs;
                if (ct.CanBeCanceled)
                    _ctr = ct.Register(delegate(object this) {
                        ((ToTaskObserver<TResult>)this).Cancel();
                    }, this);
            }
            public override void OnNext(TResult value)
            {
                _hasValue = true;
                _lastValue = value;
            }
            public override void OnError(Exception error)
            {
                _tcs.TrySetException(error);
                _ctr.Dispose();
                Dispose();
            }
            public override void OnCompleted()
            {
                if (!_hasValue)
                    try {
                        throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
                    } catch (Exception exception) {
                        _tcs.TrySetException(exception);
                    }
                else
                    _tcs.TrySetResult(_lastValue);
                _ctr.Dispose();
                Dispose();
            }
            private void Cancel()
            {
                Dispose();
                _tcs.TrySetCanceled(_ct);
            }
        }
        public static IObservable<Unit> ToObservable(this Task task)
        {
            if (task == null)
                throw new ArgumentNullException("task");
            return ToObservableImpl(task, null, false);
        }
        public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler)
        {
            if (scheduler == null)
                throw new ArgumentNullException("scheduler");
            return task.ToObservable(new TaskObservationOptions(scheduler, false));
        }
        public static IObservable<Unit> ToObservable(this Task task, TaskObservationOptions options)
        {
            if (task == null)
                throw new ArgumentNullException("task");
            if (options == null)
                throw new ArgumentNullException("options");
            return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe);
        }
        internal static IObservable<Unit> ToObservable(this Task task, TaskObservationOptions.Value options)
        {
            if (task == null)
                throw new ArgumentNullException("task");
            return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe);
        }
        private static IObservable<Unit> ToObservableImpl(Task task, [System.Runtime.CompilerServices.Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe)
        {
            if (task.IsCompleted) {
                if (scheduler == null)
                    scheduler = ImmediateScheduler.Instance;
                switch (task.Status) {
                case TaskStatus.Faulted:
                    return new Throw<Unit>(task.GetSingleException(), scheduler);
                case TaskStatus.Canceled:
                    return new Throw<Unit>(new TaskCanceledException(task), scheduler);
                default:
                    return new Return<Unit>(Unit.Default, scheduler);
                }
            }
            return new SlowTaskObservable(task, scheduler, ignoreExceptionsAfterUnsubscribe);
        }
        private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
        {
            switch (task.Status) {
            case TaskStatus.RanToCompletion:
                subject.OnNext(Unit.Default);
                subject.OnCompleted();
                break;
            case TaskStatus.Faulted:
                subject.OnError(task.GetSingleException());
                break;
            case TaskStatus.Canceled:
                subject.OnError(new TaskCanceledException(task));
                break;
            }
        }
        internal static IDisposable Subscribe(this Task task, IObserver<Unit> observer)
        {
            if (task.IsCompleted) {
                task.EmitTaskResult(observer);
                return Disposable.Empty;
            }
            CancellationDisposable cancellationDisposable = new CancellationDisposable();
            task.ContinueWith(delegate(Task t, object observerObject) {
                t.EmitTaskResult((IObserver<Unit>)observerObject);
            }, observer, cancellationDisposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
            return cancellationDisposable;
        }
        public static IObservable<TResult> ToObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task)
        {
            if (task == null)
                throw new ArgumentNullException("task");
            return ToObservableImpl(task, null, false);
        }
        public static IObservable<TResult> ToObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, IScheduler scheduler)
        {
            if (scheduler == null)
                throw new ArgumentNullException("scheduler");
            return task.ToObservable(new TaskObservationOptions(scheduler, false));
        }
        public static IObservable<TResult> ToObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, TaskObservationOptions options)
        {
            if (task == null)
                throw new ArgumentNullException("task");
            if (options == null)
                throw new ArgumentNullException("options");
            return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe);
        }
        internal static IObservable<TResult> ToObservable<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, TaskObservationOptions.Value options)
        {
            if (task == null)
                throw new ArgumentNullException("task");
            return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe);
        }
        private static IObservable<TResult> ToObservableImpl<[System.Runtime.CompilerServices.Nullable(2)] TResult>(Task<TResult> task, [System.Runtime.CompilerServices.Nullable(2)] IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe)
        {
            if (task.IsCompleted) {
                if (scheduler == null)
                    scheduler = ImmediateScheduler.Instance;
                switch (task.Status) {
                case TaskStatus.Faulted:
                    return new Throw<TResult>(task.GetSingleException(), scheduler);
                case TaskStatus.Canceled:
                    return new Throw<TResult>((Exception)new TaskCanceledException(task), scheduler);
                default:
                    return new Return<TResult>(task.Result, scheduler);
                }
            }
            return new SlowTaskObservable<TResult>(task, scheduler, ignoreExceptionsAfterUnsubscribe);
        }
        private static void EmitTaskResult<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, IObserver<TResult> subject)
        {
            switch (task.Status) {
            case TaskStatus.RanToCompletion:
                subject.OnNext(task.Result);
                subject.OnCompleted();
                break;
            case TaskStatus.Faulted:
                subject.OnError(task.GetSingleException());
                break;
            case TaskStatus.Canceled:
                subject.OnError((Exception)new TaskCanceledException(task));
                break;
            }
        }
        [System.Runtime.CompilerServices.NullableContext(2)]
        private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler scheduler)
        {
            TaskContinuationOptions taskContinuationOptions = TaskContinuationOptions.None;
            if (scheduler != null)
                taskContinuationOptions |= TaskContinuationOptions.ExecuteSynchronously;
            return taskContinuationOptions;
        }
        internal static IDisposable Subscribe<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, IObserver<TResult> observer)
        {
            if (task.IsCompleted) {
                task.EmitTaskResult(observer);
                return Disposable.Empty;
            }
            CancellationDisposable cancellationDisposable = new CancellationDisposable();
            task.ContinueWith((Action<Task<TResult>, object>)delegate(Task<TResult> t, object observerObject) {
                TaskObservableExtensions.EmitTaskResult<TResult>(t, (IObserver<TResult>)observerObject);
            }, (object)observer, cancellationDisposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
            return cancellationDisposable;
        }
        public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable)
        {
            if (observable == null)
                throw new ArgumentNullException("observable");
            return observable.ToTask(default(CancellationToken), (object)null);
        }
        public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, IScheduler scheduler)
        {
            return observable.ToTask().ContinueOnScheduler(scheduler);
        }
        public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, [System.Runtime.CompilerServices.Nullable(2)] object state)
        {
            if (observable == null)
                throw new ArgumentNullException("observable");
            return observable.ToTask(default(CancellationToken), state);
        }
        public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, [System.Runtime.CompilerServices.Nullable(2)] object state, IScheduler scheduler)
        {
            return observable.ToTask(default(CancellationToken), state).ContinueOnScheduler(scheduler);
        }
        public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken)
        {
            if (observable == null)
                throw new ArgumentNullException("observable");
            return observable.ToTask(cancellationToken, (object)null);
        }
        public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, IScheduler scheduler)
        {
            return observable.ToTask(cancellationToken, (object)null).ContinueOnScheduler(scheduler);
        }
        internal static Task<TResult> ContinueOnScheduler<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this Task<TResult> task, IScheduler scheduler)
        {
            if (scheduler == null)
                throw new ArgumentNullException("scheduler");
            TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>(task.AsyncState);
            task.ContinueWith((Action<Task<TResult>, object>)delegate(Task<TResult> t, object o) {
                (IScheduler, TaskCompletionSource<TResult>) obj = ((IScheduler, TaskCompletionSource<TResult>))o;
                IScheduler item = obj.Item1;
                TaskCompletionSource<TResult> item2 = obj.Item2;
                Scheduler.ScheduleAction<(Task<TResult>, TaskCompletionSource<TResult>)>(item, (t, item2), (Action<(Task<TResult>, TaskCompletionSource<TResult>)>)delegate((Task<TResult> t, TaskCompletionSource<TResult> tcs) state) {
                    if (state.t.IsCanceled)
                        state.tcs.TrySetCanceled(new TaskCanceledException(state.t).CancellationToken);
                    else if (state.t.IsFaulted) {
                        state.tcs.TrySetException(state.t.GetSingleException());
                    } else {
                        state.tcs.TrySetResult(state.t.Result);
                    }
                });
            }, (object)(scheduler, taskCompletionSource), TaskContinuationOptions.ExecuteSynchronously);
            return taskCompletionSource.Task;
        }
        public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, [System.Runtime.CompilerServices.Nullable(2)] object state)
        {
            if (observable == null)
                throw new ArgumentNullException("observable");
            TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>(state);
            ToTaskObserver<TResult> toTaskObserver = new ToTaskObserver<TResult>(taskCompletionSource, cancellationToken);
            try {
                ((SafeObserver<TResult>)toTaskObserver).SetResource(observable.Subscribe((IObserver<TResult>)toTaskObserver));
            } catch (Exception exception) {
                taskCompletionSource.TrySetException(exception);
            }
            return taskCompletionSource.Task;
        }
        public static Task<TResult> ToTask<[System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, [System.Runtime.CompilerServices.Nullable(2)] object state, IScheduler scheduler)
        {
            return observable.ToTask(cancellationToken, state).ContinueOnScheduler(scheduler);
        }
    }
}