Synchronization
Provides basic synchronization and scheduling services for observable sequences.
using System.ComponentModel;
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Concurrency
{
[EditorBrowsable(EditorBrowsableState.Advanced)]
public static class Synchronization
{
public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException("source");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new AnonymousObservable<TSource>((Func<IObserver<TSource>, IDisposable>)delegate(IObserver<TSource> observer) {
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
SerialDisposable d = new SerialDisposable();
d.Disposable = singleAssignmentDisposable;
singleAssignmentDisposable.Disposable = scheduler.Schedule((Action)delegate {
d.Disposable = new ScheduledDisposable(scheduler, ObservableExtensions.SubscribeSafe<TSource>(source, observer));
});
return d;
});
}
public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
{
if (source == null)
throw new ArgumentNullException("source");
if (context == null)
throw new ArgumentNullException("context");
return new AnonymousObservable<TSource>((Func<IObserver<TSource>, IDisposable>)delegate(IObserver<TSource> observer) {
SingleAssignmentDisposable subscription = new SingleAssignmentDisposable();
context.PostWithStartComplete(delegate {
if (!subscription.IsDisposed)
subscription.Disposable = new ContextDisposable(context, ObservableExtensions.SubscribeSafe<TSource>(source, observer));
});
return subscription;
});
}
public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException("source");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new ObserveOn<TSource>.Scheduler(source, scheduler);
}
public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
{
if (source == null)
throw new ArgumentNullException("source");
if (context == null)
throw new ArgumentNullException("context");
return new ObserveOn<TSource>.Context(source, context);
}
public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source)
{
if (source == null)
throw new ArgumentNullException("source");
return new Synchronize<TSource>(source);
}
public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate)
{
if (source == null)
throw new ArgumentNullException("source");
if (gate == null)
throw new ArgumentNullException("gate");
return new Synchronize<TSource>(source, gate);
}
}
}