<PackageReference Include="System.Reactive" Version="5.0.0-preview.220" />

AsyncInfoObservable

public static class AsyncInfoObservable
Provides a set of extension methods to expose observable sequences as Windows Runtime asynchronous actions and operations.
using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Reactive.Threading.Tasks; using System.Runtime.CompilerServices; using System.Runtime.InteropServices.WindowsRuntime; using System.Threading; using System.Threading.Tasks; using Windows.Foundation; namespace System.Reactive.Linq { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] [CLSCompliant(false)] public static class AsyncInfoObservable { public static IAsyncAction ToAsyncAction<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source) { if (source == null) throw new ArgumentNullException("source"); return AsyncInfo.Run((Func<CancellationToken, Task>)((CancellationToken ct) => TaskObservableExtensions.ToTask<TSource>(Observable.DefaultIfEmpty<TSource>(source), ct))); } public static IAsyncActionWithProgress<int> ToAsyncActionWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source) { if (source == null) throw new ArgumentNullException("source"); return AsyncInfo.Run<int>((Func<CancellationToken, IProgress<int>, Task>)delegate(CancellationToken ct, IProgress<int> progress) { int i = 0; return TaskObservableExtensions.ToTask<TSource>(Observable.DefaultIfEmpty<TSource>(Observable.Do<TSource>(source, (Action<TSource>)delegate { progress.Report(++i); })), ct); }); } public static IAsyncActionWithProgress<TProgress> ToAsyncActionWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector) { if (source == null) throw new ArgumentNullException("source"); if (progressSelector == null) throw new ArgumentNullException("progressSelector"); return AsyncInfo.Run<TProgress>((Func<CancellationToken, IProgress<TProgress>, Task>)((CancellationToken ct, IProgress<TProgress> progress) => TaskObservableExtensions.ToTask<TSource>(Observable.Create<TSource>((Func<IObserver<TSource>, IDisposable>)delegate(IObserver<TSource> observer) { IObserver<TSource> observer2 = Observer.Synchronize<TSource>(observer); IConnectableObservable<TSource> connectableObservable = Observable.Publish<TSource>(source); IObservable<TProgress> source2 = progressSelector(connectableObservable); IProgress<TProgress> progress2 = progress; Action<TProgress> onNext = progress2.Report; IObserver<TSource> observer3 = observer2; IDisposable disposable = ObservableExtensions.Subscribe<TProgress>(source2, onNext, (Action<Exception>)observer3.OnError); IDisposable disposable2 = Observable.DefaultIfEmpty<TSource>((IObservable<TSource>)connectableObservable).Subscribe(observer2); IDisposable disposable3 = connectableObservable.Connect(); return StableCompositeDisposable.CreateTrusted(disposable, disposable2, disposable3); }), ct))); } public static IAsyncOperation<TSource> ToAsyncOperation<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source) { if (source == null) throw new ArgumentNullException("source"); return AsyncInfo.Run<TSource>((Func<CancellationToken, Task<TSource>>)((CancellationToken ct) => TaskObservableExtensions.ToTask<TSource>(source, ct))); } public static IAsyncOperationWithProgress<TSource, int> ToAsyncOperationWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source) { if (source == null) throw new ArgumentNullException("source"); return AsyncInfo.Run<TSource, int>((Func<CancellationToken, IProgress<int>, Task<TSource>>)delegate(CancellationToken ct, IProgress<int> progress) { int i = 0; return TaskObservableExtensions.ToTask<TSource>(Observable.Do<TSource>(source, (Action<TSource>)delegate { progress.Report(++i); }), ct); }); } public static IAsyncOperationWithProgress<TResult, int> ToAsyncOperationWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector) { if (source == null) throw new ArgumentNullException("source"); if (resultSelector == null) throw new ArgumentNullException("resultSelector"); return AsyncInfo.Run<TResult, int>((Func<CancellationToken, IProgress<int>, Task<TResult>>)delegate(CancellationToken ct, IProgress<int> progress) { int i = 0; return TaskObservableExtensions.ToTask<TResult>(resultSelector(Observable.Do<TSource>(source, (Action<TSource>)delegate { progress.Report(++i); })), ct); }); } public static IAsyncOperationWithProgress<TResult, TProgress> ToAsyncOperationWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TResult, [System.Runtime.CompilerServices.Nullable(2)] TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector) { if (source == null) throw new ArgumentNullException("source"); if (resultSelector == null) throw new ArgumentNullException("resultSelector"); if (progressSelector == null) throw new ArgumentNullException("progressSelector"); return AsyncInfo.Run<TResult, TProgress>((Func<CancellationToken, IProgress<TProgress>, Task<TResult>>)((CancellationToken ct, IProgress<TProgress> progress) => TaskObservableExtensions.ToTask<TResult>(Observable.Create<TResult>((Func<IObserver<TResult>, IDisposable>)delegate(IObserver<TResult> observer) { IObserver<TResult> observer2 = Observer.Synchronize<TResult>(observer); IConnectableObservable<TSource> connectableObservable = Observable.Publish<TSource>(source); IObservable<TProgress> source2 = progressSelector(connectableObservable); IProgress<TProgress> progress2 = progress; Action<TProgress> onNext = progress2.Report; IObserver<TResult> observer3 = observer2; IDisposable disposable = ObservableExtensions.Subscribe<TProgress>(source2, onNext, (Action<Exception>)observer3.OnError); IDisposable disposable2 = resultSelector(connectableObservable).Subscribe(observer2); IDisposable disposable3 = connectableObservable.Connect(); return StableCompositeDisposable.CreateTrusted(disposable, disposable2, disposable3); }), ct))); } } }