RemotingObservable
Provides a set of static methods for exposing observable sequences through .NET Remoting.
            
                using System.Linq.Expressions;
using System.Reactive.Disposables;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.Remoting;
using System.Runtime.Remoting.Lifetime;
using System.Security;
using System.Threading;
namespace System.Reactive.Linq
{
    [System.Runtime.CompilerServices.NullableContext(1)]
    [System.Runtime.CompilerServices.Nullable(0)]
    public static class RemotingObservable
    {
        [Serializable]
        [System.Runtime.CompilerServices.Nullable(0)]
        private class SerializableObservable<[System.Runtime.CompilerServices.Nullable(2)] T> : IObservable<T>
        {
            private readonly RemotableObservable<T> _remotableObservable;
            public SerializableObservable(RemotableObservable<T> remotableObservable)
            {
                _remotableObservable = remotableObservable;
            }
            public IDisposable Subscribe(IObserver<T> observer)
            {
                ISafeObserver<T> safeObserver = SafeObserver<T>.Wrap(observer);
                IDisposable disposable = _remotableObservable.Subscribe(new RemotableObserver<T>(safeObserver));
                safeObserver.SetResource(disposable);
                return disposable;
            }
        }
        [System.Runtime.CompilerServices.Nullable(0)]
        private class RemotableObserver<[System.Runtime.CompilerServices.Nullable(2)] T> : MarshalByRefObject, IObserver<T>
        {
            private readonly IObserver<T> _underlyingObserver;
            public RemotableObserver(IObserver<T> underlyingObserver)
            {
                _underlyingObserver = underlyingObserver;
            }
            public void OnNext(T value)
            {
                _underlyingObserver.OnNext(value);
            }
            public void OnError(Exception exception)
            {
                try {
                    _underlyingObserver.OnError(exception);
                } finally {
                    Unregister();
                }
            }
            public void OnCompleted()
            {
                try {
                    _underlyingObserver.OnCompleted();
                } finally {
                    Unregister();
                }
            }
            [SecuritySafeCritical]
            private void Unregister()
            {
                ? val = RemotingServices.GetLifetimeService((MarshalByRefObject)this);
                if ((int)val != 0)
                    val.Unregister(this);
            }
            [SecurityCritical]
            public override object InitializeLifetimeService()
            {
                ? val = base.InitializeLifetimeService();
                val.Register(this);
                return (object)val;
            }
            [SecurityCritical]
            TimeSpan Renewal(ILease lease)
            {
                return lease.get_InitialLeaseTime();
            }
        }
        [Serializable]
        [System.Runtime.CompilerServices.Nullable(0)]
        private sealed class RemotableObservable<[System.Runtime.CompilerServices.Nullable(2)] T> : MarshalByRefObject, IObservable<T>
        {
            [System.Runtime.CompilerServices.Nullable(0)]
            private sealed class RemotableSubscription : MarshalByRefObject, IDisposable
            {
                private IDisposable _underlyingSubscription;
                public RemotableSubscription(IDisposable underlyingSubscription)
                {
                    _underlyingSubscription = underlyingSubscription;
                }
                public void Dispose()
                {
                    using (Interlocked.Exchange<IDisposable>(ref _underlyingSubscription, Disposable.Empty))
                        Unregister();
                }
                [SecuritySafeCritical]
                private void Unregister()
                {
                    ? val = RemotingServices.GetLifetimeService((MarshalByRefObject)this);
                    if ((int)val != 0)
                        val.Unregister(this);
                }
                [SecurityCritical]
                public override object InitializeLifetimeService()
                {
                    ? val = base.InitializeLifetimeService();
                    val.Register(this);
                    return (object)val;
                }
                [SecurityCritical]
                TimeSpan Renewal(ILease lease)
                {
                    return lease.get_InitialLeaseTime();
                }
            }
            private readonly IObservable<T> _underlyingObservable;
            [System.Runtime.CompilerServices.Nullable(2)]
            private readonly ILease _lease;
            public RemotableObservable(IObservable<T> underlyingObservable, [System.Runtime.CompilerServices.Nullable(2)] ILease lease)
            {
                _underlyingObservable = underlyingObservable;
                _lease = lease;
            }
            public IDisposable Subscribe(IObserver<T> observer)
            {
                return new RemotableSubscription(_underlyingObservable.Subscribe(observer));
            }
            [System.Runtime.CompilerServices.NullableContext(2)]
            [SecurityCritical]
            public override object InitializeLifetimeService()
            {
                return _lease;
            }
        }
        public static IObservable<TSource> Remotable<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            return Remotable_(source);
        }
        public static IObservable<TSource> Remotable<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IObservable<TSource> source, ILease lease)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            return Remotable_(source, lease);
        }
        public static IQbservable<TSource> Remotable<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IQbservable<TSource> source)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            return source.Provider.CreateQuery<TSource>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression));
        }
        public static IQbservable<TSource> Remotable<[System.Runtime.CompilerServices.Nullable(2)] TSource>(this IQbservable<TSource> source, ILease lease)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            return source.Provider.CreateQuery<TSource>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression, Expression.Constant(lease, typeof(ILease))));
        }
        private static IObservable<TSource> Remotable_<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source)
        {
            return new SerializableObservable<TSource>(new RemotableObservable<TSource>(source, null));
        }
        private static IObservable<TSource> Remotable_<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source, [System.Runtime.CompilerServices.Nullable(2)] ILease lease)
        {
            return new SerializableObservable<TSource>(new RemotableObservable<TSource>(source, lease));
        }
    }
}