AsyncInfoObservable
Provides a set of extension methods to expose observable sequences as Windows Runtime asynchronous actions and operations.
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices.WindowsRuntime;
using System.Threading;
using System.Threading.Tasks;
using Windows.Foundation;
namespace System.Reactive.Linq
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
[CLSCompliant(false)]
public static class AsyncInfoObservable
{
public static IAsyncAction ToAsyncAction<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source)
{
if (source == null)
throw new ArgumentNullException("source");
return AsyncInfo.Run((Func<CancellationToken, Task>)((CancellationToken ct) => TaskObservableExtensions.ToTask<TSource>(Observable.DefaultIfEmpty<TSource>(source), ct)));
}
public static IAsyncActionWithProgress<int> ToAsyncActionWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source)
{
if (source == null)
throw new ArgumentNullException("source");
return AsyncInfo.Run<int>((Func<CancellationToken, IProgress<int>, Task>)delegate(CancellationToken ct, IProgress<int> progress) {
int i = 0;
return TaskObservableExtensions.ToTask<TSource>(Observable.DefaultIfEmpty<TSource>(Observable.Do<TSource>(source, (Action<TSource>)delegate {
progress.Report(++i);
})), ct);
});
}
public static IAsyncActionWithProgress<TProgress> ToAsyncActionWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector)
{
if (source == null)
throw new ArgumentNullException("source");
if (progressSelector == null)
throw new ArgumentNullException("progressSelector");
return AsyncInfo.Run<TProgress>((Func<CancellationToken, IProgress<TProgress>, Task>)((CancellationToken ct, IProgress<TProgress> progress) => TaskObservableExtensions.ToTask<TSource>(Observable.Create<TSource>((Func<IObserver<TSource>, IDisposable>)delegate(IObserver<TSource> observer) {
IObserver<TSource> observer2 = Observer.Synchronize<TSource>(observer);
IConnectableObservable<TSource> connectableObservable = Observable.Publish<TSource>(source);
IObservable<TProgress> source2 = progressSelector(connectableObservable);
IProgress<TProgress> progress2 = progress;
Action<TProgress> onNext = progress2.Report;
IObserver<TSource> observer3 = observer2;
IDisposable disposable = ObservableExtensions.Subscribe<TProgress>(source2, onNext, (Action<Exception>)observer3.OnError);
IDisposable disposable2 = Observable.DefaultIfEmpty<TSource>((IObservable<TSource>)connectableObservable).Subscribe(observer2);
IDisposable disposable3 = connectableObservable.Connect();
return StableCompositeDisposable.CreateTrusted(disposable, disposable2, disposable3);
}), ct)));
}
public static IAsyncOperation<TSource> ToAsyncOperation<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source)
{
if (source == null)
throw new ArgumentNullException("source");
return AsyncInfo.Run<TSource>((Func<CancellationToken, Task<TSource>>)((CancellationToken ct) => TaskObservableExtensions.ToTask<TSource>(source, ct)));
}
public static IAsyncOperationWithProgress<TSource, int> ToAsyncOperationWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source)
{
if (source == null)
throw new ArgumentNullException("source");
return AsyncInfo.Run<TSource, int>((Func<CancellationToken, IProgress<int>, Task<TSource>>)delegate(CancellationToken ct, IProgress<int> progress) {
int i = 0;
return TaskObservableExtensions.ToTask<TSource>(Observable.Do<TSource>(source, (Action<TSource>)delegate {
progress.Report(++i);
}), ct);
});
}
public static IAsyncOperationWithProgress<TResult, int> ToAsyncOperationWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TResult>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector)
{
if (source == null)
throw new ArgumentNullException("source");
if (resultSelector == null)
throw new ArgumentNullException("resultSelector");
return AsyncInfo.Run<TResult, int>((Func<CancellationToken, IProgress<int>, Task<TResult>>)delegate(CancellationToken ct, IProgress<int> progress) {
int i = 0;
return TaskObservableExtensions.ToTask<TResult>(resultSelector(Observable.Do<TSource>(source, (Action<TSource>)delegate {
progress.Report(++i);
})), ct);
});
}
public static IAsyncOperationWithProgress<TResult, TProgress> ToAsyncOperationWithProgress<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TResult, [System.Runtime.CompilerServices.Nullable(2)] TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector)
{
if (source == null)
throw new ArgumentNullException("source");
if (resultSelector == null)
throw new ArgumentNullException("resultSelector");
if (progressSelector == null)
throw new ArgumentNullException("progressSelector");
return AsyncInfo.Run<TResult, TProgress>((Func<CancellationToken, IProgress<TProgress>, Task<TResult>>)((CancellationToken ct, IProgress<TProgress> progress) => TaskObservableExtensions.ToTask<TResult>(Observable.Create<TResult>((Func<IObserver<TResult>, IDisposable>)delegate(IObserver<TResult> observer) {
IObserver<TResult> observer2 = Observer.Synchronize<TResult>(observer);
IConnectableObservable<TSource> connectableObservable = Observable.Publish<TSource>(source);
IObservable<TProgress> source2 = progressSelector(connectableObservable);
IProgress<TProgress> progress2 = progress;
Action<TProgress> onNext = progress2.Report;
IObserver<TResult> observer3 = observer2;
IDisposable disposable = ObservableExtensions.Subscribe<TProgress>(source2, onNext, (Action<Exception>)observer3.OnError);
IDisposable disposable2 = resultSelector(connectableObservable).Subscribe(observer2);
IDisposable disposable3 = connectableObservable.Connect();
return StableCompositeDisposable.CreateTrusted(disposable, disposable2, disposable3);
}), ct)));
}
}
}