AsyncInfoObservable
Provides a set of extension methods to expose observable sequences as Windows Runtime asynchronous actions and operations.
            
                using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices.WindowsRuntime;
using System.Threading;
using System.Threading.Tasks;
using Windows.Foundation;
namespace System.Reactive.Linq
{
    [System.Runtime.CompilerServices.NullableContext(1)]
    [System.Runtime.CompilerServices.Nullable(0)]
    [CLSCompliant(false)]
    public static class AsyncInfoObservable
    {
        public static IAsyncAction ToAsyncAction<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            return AsyncInfo.Run((Func<CancellationToken, Task>)((CancellationToken ct) => TaskObservableExtensions.ToTask<TSource>(Observable.DefaultIfEmpty<TSource>(source), ct)));
        }
        public static IAsyncActionWithProgress<int> ToAsyncActionWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            return AsyncInfo.Run<int>((Func<CancellationToken, IProgress<int>, Task>)delegate(CancellationToken ct, IProgress<int> progress) {
                int i = 0;
                return TaskObservableExtensions.ToTask<TSource>(Observable.DefaultIfEmpty<TSource>(Observable.Do<TSource>(source, (Action<TSource>)delegate {
                    progress.Report(++i);
                })), ct);
            });
        }
        public static IAsyncActionWithProgress<TProgress> ToAsyncActionWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (progressSelector == null)
                throw new ArgumentNullException("progressSelector");
            return AsyncInfo.Run<TProgress>((Func<CancellationToken, IProgress<TProgress>, Task>)((CancellationToken ct, IProgress<TProgress> progress) => TaskObservableExtensions.ToTask<TSource>(Observable.Create<TSource>((Func<IObserver<TSource>, IDisposable>)delegate(IObserver<TSource> observer) {
                IObserver<TSource> observer2 = Observer.Synchronize<TSource>(observer);
                IConnectableObservable<TSource> connectableObservable = Observable.Publish<TSource>(source);
                IObservable<TProgress> source2 = progressSelector(connectableObservable);
                IProgress<TProgress> progress2 = progress;
                Action<TProgress> onNext = progress2.Report;
                IObserver<TSource> observer3 = observer2;
                IDisposable disposable = ObservableExtensions.Subscribe<TProgress>(source2, onNext, (Action<Exception>)observer3.OnError);
                IDisposable disposable2 = Observable.DefaultIfEmpty<TSource>((IObservable<TSource>)connectableObservable).Subscribe(observer2);
                IDisposable disposable3 = connectableObservable.Connect();
                return StableCompositeDisposable.CreateTrusted(disposable, disposable2, disposable3);
            }), ct)));
        }
        public static IAsyncOperation<TSource> ToAsyncOperation<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            return AsyncInfo.Run<TSource>((Func<CancellationToken, Task<TSource>>)((CancellationToken ct) => TaskObservableExtensions.ToTask<TSource>(source, ct)));
        }
        public static IAsyncOperationWithProgress<TSource, int> ToAsyncOperationWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            return AsyncInfo.Run<TSource, int>((Func<CancellationToken, IProgress<int>, Task<TSource>>)delegate(CancellationToken ct, IProgress<int> progress) {
                int i = 0;
                return TaskObservableExtensions.ToTask<TSource>(Observable.Do<TSource>(source, (Action<TSource>)delegate {
                    progress.Report(++i);
                }), ct);
            });
        }
        public static IAsyncOperationWithProgress<TResult, int> ToAsyncOperationWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (resultSelector == null)
                throw new ArgumentNullException("resultSelector");
            return AsyncInfo.Run<TResult, int>((Func<CancellationToken, IProgress<int>, Task<TResult>>)delegate(CancellationToken ct, IProgress<int> progress) {
                int i = 0;
                return TaskObservableExtensions.ToTask<TResult>(resultSelector(Observable.Do<TSource>(source, (Action<TSource>)delegate {
                    progress.Report(++i);
                })), ct);
            });
        }
        public static IAsyncOperationWithProgress<TResult, TProgress> ToAsyncOperationWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TResult, [System.Runtime.CompilerServices.Nullable(2)] TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (resultSelector == null)
                throw new ArgumentNullException("resultSelector");
            if (progressSelector == null)
                throw new ArgumentNullException("progressSelector");
            return AsyncInfo.Run<TResult, TProgress>((Func<CancellationToken, IProgress<TProgress>, Task<TResult>>)((CancellationToken ct, IProgress<TProgress> progress) => TaskObservableExtensions.ToTask<TResult>(Observable.Create<TResult>((Func<IObserver<TResult>, IDisposable>)delegate(IObserver<TResult> observer) {
                IObserver<TResult> observer2 = Observer.Synchronize<TResult>(observer);
                IConnectableObservable<TSource> connectableObservable = Observable.Publish<TSource>(source);
                IObservable<TProgress> source2 = progressSelector(connectableObservable);
                IProgress<TProgress> progress2 = progress;
                Action<TProgress> onNext = progress2.Report;
                IObserver<TResult> observer3 = observer2;
                IDisposable disposable = ObservableExtensions.Subscribe<TProgress>(source2, onNext, (Action<Exception>)observer3.OnError);
                IDisposable disposable2 = resultSelector(connectableObservable).Subscribe(observer2);
                IDisposable disposable3 = connectableObservable.Connect();
                return StableCompositeDisposable.CreateTrusted(disposable, disposable2, disposable3);
            }), ct)));
        }
    }
}