<PackageReference Include="Relativity.Transfer.Client" Version="7.2.7" />

TaskObservableExtensions

public static class TaskObservableExtensions
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Linq.ObservableImpl; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Threading.Tasks { public static class TaskObservableExtensions { private sealed class ToTaskObserver<TResult> : IObserver<TResult> { private readonly CancellationToken _ct; private readonly IDisposable _disposable; private readonly TaskCompletionSource<TResult> _tcs; private readonly CancellationTokenRegistration _ctr; private bool _hasValue; private TResult _lastValue; public ToTaskObserver(TaskCompletionSource<TResult> tcs, IDisposable disposable, CancellationToken ct) { _ct = ct; _tcs = tcs; _disposable = disposable; if (ct.CanBeCanceled) _ctr = ct.Register(Cancel); } public void OnNext(TResult value) { _hasValue = true; _lastValue = value; } public void OnError(Exception error) { _tcs.TrySetException(error); _ctr.Dispose(); _disposable.Dispose(); } public void OnCompleted() { if (_hasValue) _tcs.TrySetResult(_lastValue); else _tcs.TrySetException(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); _ctr.Dispose(); _disposable.Dispose(); } private void Cancel() { _disposable.Dispose(); _tcs.TrySetCanceled(_ct); } } public static IObservable<Unit> ToObservable(this Task task) { if (task == null) throw new ArgumentNullException("task"); return ToObservableImpl(task, null); } public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler) { if (task == null) throw new ArgumentNullException("task"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return ToObservableImpl(task, scheduler); } private static IObservable<Unit> ToObservableImpl(Task task, IScheduler scheduler) { IObservable<Unit> result = null; if (task.IsCompleted) { scheduler = (scheduler ?? ImmediateScheduler.Instance); switch (task.Status) { case TaskStatus.RanToCompletion: result = new Return<Unit>(Unit.Default, scheduler); break; case TaskStatus.Faulted: result = new Throw<Unit>(task.Exception.InnerException, scheduler); break; case TaskStatus.Canceled: result = new Throw<Unit>(new TaskCanceledException(task), scheduler); break; } } else result = ToObservableSlow(task, scheduler); return result; } private static IObservable<Unit> ToObservableSlow(Task task, IScheduler scheduler) { AsyncSubject<Unit> subject = new AsyncSubject<Unit>(); TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(scheduler); task.ContinueWith(delegate { ToObservableDone(task, subject); }, taskContinuationOptions); return ToObservableResult(subject, scheduler); } private static void ToObservableDone(Task task, IObserver<Unit> subject) { switch (task.Status) { case TaskStatus.RanToCompletion: subject.OnNext(Unit.Default); subject.OnCompleted(); break; case TaskStatus.Faulted: subject.OnError(task.Exception.InnerException); break; case TaskStatus.Canceled: subject.OnError(new TaskCanceledException(task)); break; } } public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task) { if (task == null) throw new ArgumentNullException("task"); return ToObservableImpl(task, null); } public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, IScheduler scheduler) { if (task == null) throw new ArgumentNullException("task"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return ToObservableImpl(task, scheduler); } private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler scheduler) { IObservable<TResult> result = null; if (task.IsCompleted) { scheduler = (scheduler ?? ImmediateScheduler.Instance); switch (task.Status) { case TaskStatus.RanToCompletion: result = new Return<TResult>(task.Result, scheduler); break; case TaskStatus.Faulted: result = new Throw<TResult>(task.Exception.InnerException, scheduler); break; case TaskStatus.Canceled: result = new Throw<TResult>((Exception)new TaskCanceledException(task), scheduler); break; } } else result = ToObservableSlow(task, scheduler); return result; } private static IObservable<TResult> ToObservableSlow<TResult>(Task<TResult> task, IScheduler scheduler) { AsyncSubject<TResult> subject = (AsyncSubject<TResult>)new AsyncSubject<TResult>(); TaskContinuationOptions taskContinuationOptions = GetTaskContinuationOptions(scheduler); task.ContinueWith((Action<Task<TResult>>)delegate { TaskObservableExtensions.ToObservableDone<TResult>(task, (IObserver<TResult>)subject); }, taskContinuationOptions); return ToObservableResult((AsyncSubject<TResult>)subject, scheduler); } private static void ToObservableDone<TResult>(Task<TResult> task, IObserver<TResult> subject) { switch (task.Status) { case TaskStatus.RanToCompletion: subject.OnNext(task.Result); subject.OnCompleted(); break; case TaskStatus.Faulted: subject.OnError(task.Exception.InnerException); break; case TaskStatus.Canceled: subject.OnError((Exception)new TaskCanceledException(task)); break; } } private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler scheduler) { TaskContinuationOptions taskContinuationOptions = TaskContinuationOptions.None; if (scheduler != null) taskContinuationOptions |= TaskContinuationOptions.ExecuteSynchronously; return taskContinuationOptions; } private static IObservable<TResult> ToObservableResult<TResult>(AsyncSubject<TResult> subject, IScheduler scheduler) { if (scheduler != null) return subject.ObserveOn(scheduler); return subject.AsObservable(); } public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable) { if (observable == null) throw new ArgumentNullException("observable"); return observable.ToTask(default(CancellationToken), null); } public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, object state) { if (observable == null) throw new ArgumentNullException("observable"); return observable.ToTask(default(CancellationToken), state); } public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken) { if (observable == null) throw new ArgumentNullException("observable"); return observable.ToTask(cancellationToken, null); } public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, object state) { if (observable == null) throw new ArgumentNullException("observable"); TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>(state); SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); ToTaskObserver<TResult> observer = new ToTaskObserver<TResult>(taskCompletionSource, (IDisposable)singleAssignmentDisposable, cancellationToken); try { singleAssignmentDisposable.Disposable = observable.Subscribe((IObserver<TResult>)observer); } catch (Exception exception) { taskCompletionSource.TrySetException(exception); } return taskCompletionSource.Task; } } }