| A typical pub-sub scenario is subscribing to handle requests from multiple clients and then handling client updates in a buffered way. Let's see how this could be done using Reactive Extensions. To make this scenario more concrete - imagine your server receives a stream of frequent updates on number of stocks (e.g. IBM_1...IBM_10 in example below). We want to process the most recent price per stock and display it to a trader at most once a second. All we would need - is to use ReactivePubSubHandler given below, providing "selector" to identify a pub-sub client (i.e. what stock was updated) and an action to be implemented (e.g. update price display) - say once in a second.  | 
| using System; 
using System.Reactive.Linq; 
using System.Reactive.Subjects; 
using System.Threading; 
namespace ReactivePubSub 
{ 
    class Program 
    { 
        class ClientUpdate 
        { 
            public string StockName { get; set; } 
            public float Price { get; set; } 
        } 
        static void Main() 
        { 
            var sampledAction = new ReactivePubSubHandler<ClientUpdate, string>(1000, _=>_.StockName, _ => Console.WriteLine( 
                $"{DateTime.Now} Processed update for client #{_.StockName} Price={_.Price}")); 
            var rndStockName = new Random(); 
            for (int i = 0; i < 1000; i++) 
            { 
                var clientUpdate = new ClientUpdate() 
                { 
                    StockName = "IBM_" + rndStockName.Next(0, 10), 
                    Price = (float) (rndStockName.Next(10000, 11000) / 100.0) 
                }; 
                sampledAction.OnClientRequest(clientUpdate); 
                Thread.Sleep(200); 
            } 
        } 
    } 
    public class ReactivePubSubHandler<TClientData, TClientDataKey> 
    { 
        public ReactivePubSubHandler(int sampleIntervalMs, Func<TClientData, TClientDataKey> pubSubTopicSelector, Action<TClientData> pubSubAction) 
        { 
            var sampledActionsSubscription = _clientsSubscription.GroupBy(pubSubTopicSelector) 
                .Select(x => x.Sample(TimeSpan.FromMilliseconds(sampleIntervalMs))).SelectMany(x => x); 
            sampledActionsSubscription.Subscribe(pubSubAction); 
        } 
        public void OnClientRequest(TClientData clientData) 
        { 
            _clientsSubscription.OnNext(clientData); 
        } 
        private readonly Subject<TClientData> _clientsSubscription = new Subject<TClientData>(); 
    } 
} 
 |