A typical pub-sub scenario is subscribing to handle requests from multiple clients and then handling client updates in a buffered way. Let's see how this could be done using Reactive Extensions. To make this scenario more concrete - imagine your server receives a stream of frequent updates on number of stocks (e.g. IBM_1...IBM_10 in example below). We want to process the most recent price per stock and display it to a trader at most once a second. All we would need - is to use ReactivePubSubHandler given below, providing "selector" to identify a pub-sub client (i.e. what stock was updated) and an action to be implemented (e.g. update price display) - say once in a second. |
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
namespace ReactivePubSub
{
class Program
{
class ClientUpdate
{
public string StockName { get; set; }
public float Price { get; set; }
}
static void Main()
{
var sampledAction = new ReactivePubSubHandler<ClientUpdate, string>(1000, _=>_.StockName, _ => Console.WriteLine(
$"{DateTime.Now} Processed update for client #{_.StockName} Price={_.Price}"));
var rndStockName = new Random();
for (int i = 0; i < 1000; i++)
{
var clientUpdate = new ClientUpdate()
{
StockName = "IBM_" + rndStockName.Next(0, 10),
Price = (float) (rndStockName.Next(10000, 11000) / 100.0)
};
sampledAction.OnClientRequest(clientUpdate);
Thread.Sleep(200);
}
}
}
public class ReactivePubSubHandler<TClientData, TClientDataKey>
{
public ReactivePubSubHandler(int sampleIntervalMs, Func<TClientData, TClientDataKey> pubSubTopicSelector, Action<TClientData> pubSubAction)
{
var sampledActionsSubscription = _clientsSubscription.GroupBy(pubSubTopicSelector)
.Select(x => x.Sample(TimeSpan.FromMilliseconds(sampleIntervalMs))).SelectMany(x => x);
sampledActionsSubscription.Subscribe(pubSubAction);
}
public void OnClientRequest(TClientData clientData)
{
_clientsSubscription.OnNext(clientData);
}
private readonly Subject<TClientData> _clientsSubscription = new Subject<TClientData>();
}
}
|