Thursday, December 12, 2019

Handling PubSub updates from multiple clients in pub-sub scenario with Reactive Extentions

A typical pub-sub scenario is subscribing to handle requests from multiple clients and then handling client updates in a buffered way. Let's see how this could be done using Reactive Extensions.

To make this scenario more concrete - imagine your server receives a stream of frequent updates on number of stocks (e.g. IBM_1...IBM_10 in example below). We want to process the most recent price per stock and display it to a trader at most once a second.

All we would need - is to use ReactivePubSubHandler given below, providing "selector" to identify a pub-sub client (i.e. what stock was updated) and an action to be implemented (e.g. update price display) - say once in a second.



using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;

namespace ReactivePubSub
{
    class Program
    {
        class ClientUpdate
        {
            public string StockName { getset; }
            public float Price { getset; }
        }

        static void Main()
        {
            var sampledAction = new ReactivePubSubHandler<ClientUpdatestring>(1000, _=>_.StockName, _ => Console.WriteLine(
                $"{DateTime.Now} Processed update for client #{_.StockName} Price={_.Price}"));

            var rndStockName = new Random();
            
            for (int i = 0; i < 1000; i++)
            {
                var clientUpdate = new ClientUpdate()
                {
                    StockName = "IBM_" + rndStockName.Next(0, 10),
                    Price = (float) (rndStockName.Next(10000, 11000) / 100.0)
                };
                sampledAction.OnClientRequest(clientUpdate);
                Thread.Sleep(200);
            }
        }
    }

    public class ReactivePubSubHandler<TClientDataTClientDataKey>
    {
        public ReactivePubSubHandler(int sampleIntervalMs, Func<TClientDataTClientDataKey> pubSubTopicSelector, Action<TClientData> pubSubAction)
        {
            var sampledActionsSubscription = _clientsSubscription.GroupBy(pubSubTopicSelector)
                .Select(x => x.Sample(TimeSpan.FromMilliseconds(sampleIntervalMs))).SelectMany(x => x);
            sampledActionsSubscription.Subscribe(pubSubAction);
        }

        public void OnClientRequest(TClientData clientData)
        {
            _clientsSubscription.OnNext(clientData);
        }

        private readonly Subject<TClientData> _clientsSubscription = new Subject<TClientData>();
    }
}