<PackageReference Include="System.Reactive" Version="4.2.2" />

RemotingObservable

public static class RemotingObservable
Provides a set of static methods for exposing observable sequences through .NET Remoting.
using System.Linq.Expressions; using System.Reactive.Disposables; using System.Reflection; using System.Runtime.Remoting; using System.Runtime.Remoting.Lifetime; using System.Security; using System.Threading; namespace System.Reactive.Linq { public static class RemotingObservable { [Serializable] private class SerializableObservable<T> : IObservable<T> { private readonly RemotableObservable<T> _remotableObservable; public SerializableObservable(RemotableObservable<T> remotableObservable) { _remotableObservable = remotableObservable; } public IDisposable Subscribe(IObserver<T> observer) { ISafeObserver<T> safeObserver = SafeObserver<T>.Wrap(observer); IDisposable disposable = _remotableObservable.Subscribe(new RemotableObserver<T>(safeObserver)); safeObserver.SetResource(disposable); return disposable; } } private class RemotableObserver<T> : MarshalByRefObject, IObserver<T> { private readonly IObserver<T> _underlyingObserver; public RemotableObserver(IObserver<T> underlyingObserver) { _underlyingObserver = underlyingObserver; } public void OnNext(T value) { _underlyingObserver.OnNext(value); } public void OnError(Exception exception) { try { _underlyingObserver.OnError(exception); } finally { Unregister(); } } public void OnCompleted() { try { _underlyingObserver.OnCompleted(); } finally { Unregister(); } } [SecuritySafeCritical] private void Unregister() { ILease val = RemotingServices.GetLifetimeService((MarshalByRefObject)this); if (val != null) val.Unregister(this); } [SecurityCritical] public override object InitializeLifetimeService() { ? val = base.InitializeLifetimeService(); val.Register(this); return (object)val; } [SecurityCritical] TimeSpan Renewal(ILease lease) { return lease.get_InitialLeaseTime(); } } [Serializable] private sealed class RemotableObservable<T> : MarshalByRefObject, IObservable<T> { private sealed class RemotableSubscription : MarshalByRefObject, IDisposable { private IDisposable _underlyingSubscription; public RemotableSubscription(IDisposable underlyingSubscription) { _underlyingSubscription = underlyingSubscription; } public void Dispose() { using (Interlocked.Exchange<IDisposable>(ref _underlyingSubscription, Disposable.Empty)) Unregister(); } [SecuritySafeCritical] private void Unregister() { ILease val = RemotingServices.GetLifetimeService((MarshalByRefObject)this); if (val != null) val.Unregister(this); } [SecurityCritical] public override object InitializeLifetimeService() { ? val = base.InitializeLifetimeService(); val.Register(this); return (object)val; } [SecurityCritical] TimeSpan Renewal(ILease lease) { return lease.get_InitialLeaseTime(); } } private readonly IObservable<T> _underlyingObservable; private readonly ILease _lease; public RemotableObservable(IObservable<T> underlyingObservable, ILease lease) { _underlyingObservable = underlyingObservable; _lease = lease; } public IDisposable Subscribe(IObserver<T> observer) { return new RemotableSubscription(_underlyingObservable.Subscribe(observer)); } [SecurityCritical] public override object InitializeLifetimeService() { return _lease; } } public static IObservable<TSource> Remotable<TSource>(this IObservable<TSource> source) { if (source == null) throw new ArgumentNullException("source"); return Remotable_(source); } public static IObservable<TSource> Remotable<TSource>(this IObservable<TSource> source, ILease lease) { if (source == null) throw new ArgumentNullException("source"); return Remotable_(source, lease); } public static IQbservable<TSource> Remotable<TSource>(this IQbservable<TSource> source) { if (source == null) throw new ArgumentNullException("source"); return source.Provider.CreateQuery<TSource>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression)); } public static IQbservable<TSource> Remotable<TSource>(this IQbservable<TSource> source, ILease lease) { if (source == null) throw new ArgumentNullException("source"); return source.Provider.CreateQuery<TSource>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression, Expression.Constant(lease, typeof(ILease)))); } private static IObservable<TSource> Remotable_<TSource>(IObservable<TSource> source) { return new SerializableObservable<TSource>(new RemotableObservable<TSource>(source, null)); } private static IObservable<TSource> Remotable_<TSource>(IObservable<TSource> source, ILease lease) { return new SerializableObservable<TSource>(new RemotableObservable<TSource>(source, lease)); } } }