An NServiceBus backplane for SignalR
[UPDATE February 20th 2014: I have updated this post and the sample to reflect SignalR version 2.0.2, and NServiceBus 4.4.0]
On March 7th 2013 I did a presentation on Microsoft Techdays NL 2013 about SignalR. This is a great framework for real time messaging in websites and other .NET clients. It also works great on mobile devices. Check out my slides in my previous post here.
In this presentation, I used a Production Monitor application that runs NServiceBus in the back end to push events to our SignalR gateway. Schematically, this looks like this:
This Production Monitor gathers info from our production application at De Vries Workforce Management, a SaaS workforce planning platform for large retail organizations. Besides technical data (are our databases up?), we also gather functional data (did all shops deliver turnover data in time, did all clock times come in?).
Of course there are all kinds of monitoring applications available in the market, so why build it ourselves? First of all, our application back end is comprised of NServiceBus services, publishing all kinds of events to form our business process. By running our monitoring application as an NServiceBus service, it can also listen to those events and do its checks accordingly. The second reason for me as an architect was that this case serves as a proof of concept and exploration on how NServiceBus combines with SignalR to provide real time communication to front ends. Something we’re contemplating on putting in our application as well in due time.
Scaling out SignalR with NServiceBus
The diagram above shows how NServiceBus publishes to a SignalR Gateway, who in turn sends the information to clients connected over the web. I’ll do a separate blogpost on that later. I did a presentation on this with my lead dev and partner in crime Mark Taling at the Dutch SDN Event on March 18th in Zeist. Here are our slides from that session:
Playing with SignalR and NServiceBus together made me think about other scenario’s as well. One of the aspects that has been very nicely solved in SignalR is its scale-out scenario. Consider the following setup:
In a web farm that uses three servers running a SignalR gateway, with a load balancer in front, messages from one client connected to Server A, won’t be delivered to other clients what were routed to a different web server automatically. Server B wouldn’t know about messages sent through Server A without some extra help. For this, SignalR uses a so called “backplane”. Now, the backplane in SignalR is pluggable. Microsoft offers three possible backplanes out of the box: Redis, SqlServer and ServiceBus (built on the Azure/Windows ServiceBus).
Just for fun, I decided to build such a backplane using NServiceBus. You can find it here on GitHub.
Building a backplane for SignalR is not so difficult. Basically, you’ll need three main parts: the ScaleoutMessageBus, a Receiver, and the Backplane server process.
The ScaleoutMessageBus
You’ll need a class that inherits from Microsoft.AspNet.SignalR.ScaleoutMessageBus, which serves as the SignalR plugin for accessing the backplane.
public class NServiceBusMessageBus : ScaleoutMessageBus { internal static IBus Bus; public NServiceBusMessageBus(IDependencyResolver resolver, IBus busInstance, ScaleoutConfiguration configuration) : base(resolver, configuration) { Bus = busInstance; Configure.Instance.Configurer.ConfigureComponent<Receiver>(DependencyLifecycle.InstancePerCall) .ConfigureProperty((r) => r.SignalRMessageBus, this); // By default, there is only 1 stream in this NServiceBus backplane, and we'll open it here Open(0); } protected override Task Send(int streamIndex, IList<Message> messages) { return Task.Factory.StartNew(() => { ScaleoutMessage msg = new ScaleoutMessage(messages); Bus.Send<DistributeMessages>(m => { m.Payload = msg.ToBytes(); m.StreamIndex = streamIndex; }); }); } new internal void OnReceived(int streamIndex, ulong id, ScaleoutMessage messages) { base.OnReceived(streamIndex, id, messages); } }
I decided to have the constructor receive an instance of IBus from the outside. This way, the hosting application can decide for itself how NServiceBus is initialized (what transport to use, what dependency injection framework, etc.). Also, this enables the bus that might already be present in the application to be reused. Furthermore, you can specify some extra configuration for the ScaleoutMessageBus, in the form of a ScaleoutConfiguration object. For the base class, this contains only a MaxQueueLength property, but you could subclass this for your custom ScaleoutMessageBus and add more properties specific to your configuration. I decided to let the hosting application decide on the configuration, and thus pass it into the constructor as well.
SignalR will call the Send() method on this class as soon as a message needs to be sent to a group of clients. My implementation simply uses an NServiceBus command to wrap the payload, and send it via NServiceBus to the backplane server. SignalR wraps messages inside a ScaleoutMessage, which has convenient helper methods for serializing them to Byte[], so I used that to put the data on the bus.
Furthermore, I made the OnReceived method available to other classes within the same assembly via a method override. The reason for this is that the Receiver should be able to invoke OnReceived on the ScaleoutMessageBus base class, which is protected. In previous versions, I had to make sure that ScaleoutMessageBus.OnReceived could never be called in parallel, i.e. there can never be two invocations of OnReceived at the same time, so these had to be queued up. In 1.1.2, this seems to be handled by SignalR itself, so I can safely pass the call to the base class implementation.
The Receiver
The Receiver is a separate class, which is a straightforward NServiceBus message handler:
public class Receiver: IHandleMessages<MessagesAvailable> { public NServiceBusMessageBus SignalRMessageBus { get; set; } public void Handle(MessagesAvailable message) { var messages = ScaleoutMessage.FromBytes(message.Payload); if (SignalRMessageBus != null) { SignalRMessageBus.OnReceived(message.StreamIndex, message.PayloadId, messages); } } }
The SignalRMessageBus property is wired up by NServiceBus’s dependency injection framework. As soon as the handler is instantiated, a backpointer to the ScaleoutMessageBus is put into this property.
The handler is pretty straightforward: deserializing the Payload into a ScaleoutMessage object and passing it to the OnReceived handler. The null check is done because the handler might start receiving messages left in the queue before the SignalR initialized its ScaleoutMessageBus. Yes, this means that some messages get lost, so this could be a bit more robust. Possibly by deferring the message, or using second level retries.
The Backplane server process
The backplane itself is quite simple as well… We basically need an NServiceBus host running an endpoint with the SignalRMessageDispatcher handler. This handler receives DistributeMessages commands from any SignalR Gateway, and re-publishes the payload using Bus.Publish(). Note that this handler also makes sure that the PayloadId is unique. This is required by SignalR. I just copied this behavior from the SqlServer backplane, and it seems to work fine.
public class SignalRMessageDispatcher: IHandleMessages<DistributeMessages> { private static ulong _payloadId = 0; public IBus Bus { get; set; } public void Handle(DistributeMessages message) { var evt = new MessagesAvailable() { Payload = message.Payload, StreamIndex = message.StreamIndex, PayloadId = (ulong) Interlocked.Increment(ref _payloadId) }; Bus.Publish(evt); } }
You’ll just need to create an NServiceBus Endpoint (e.g. just use the generic host) and run this message handler in it. I’ve added NServiceBus.SignalR.BackplaneService assembly with an EndpointConfig to serve as a host, but you can put the handler in your own host if you will, or just tweak the NServiceBus configuration in my sample to suit your needs.
Wiring up
Now that we’ve got a ScaleoutMessageBus implementation based on NServiceBus and a backplane process, we’ll need to wire these up. First, the convention for wiring up the ScaleoutMessageBus in SignalR is to use an extension method for IDependencyResolver (the interface SignalR uses for dependency injection). This is what the UseNServiceBus() method in the DepencencyResolverExtentions class does:
public static class DependencyResolverExtensions { /// <summary> /// Use NServiceBus backplane for SignalR. /// </summary> /// <param name="resolver">The dependency resolver.</param> /// <param name="busInstance">The instance of the NServiceBus IBus instance inside the current host.</param> /// <param name="configuration">Scaleout configuration parameters to be used by SignalR.</param> /// <returns>The dependency resolver.</returns> public static IDependencyResolver UseNServiceBus(this IDependencyResolver resolver, IBus busInstance, ScaleoutConfiguration configuration) { var bus = new Lazy<NServiceBusMessageBus>(() => new NServiceBusMessageBus(resolver, busInstance, configuration)); resolver.Register(typeof(IMessageBus), () => bus.Value); return resolver; } }
You’ll use this extension method when initializing SignalR. As of SignalR 2.0.0, this is done through a Startup class:
public class Startup { public IBus Bus; public void Configuration(IAppBuilder app) { // Any connection or hub wire up and configuration should go here app.MapSignalR(); Bus = Configure .With() .DefaultBuilder() .UseTransport<Msmq>() .UseInMemoryTimeoutPersister() .UnicastBus() .LoadMessageHandlers() .CreateBus() .Start(); var config = new ScaleoutConfiguration() { MaxQueueLength = 100 }; // Or whatever you want GlobalHost.DependencyResolver.UseNServiceBus(Bus, config); } }
The NServiceBusMessageBus subscribes the Receiver message handler to the MessagesAvailable event. For this, it needs to know the endpoint address of the backplane endpoint. We simply put this in the (web).config of the SignalR Gateway host:
<configuration> <configSections> <section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core" /> <section name="MessageForwardingInCaseOfFaultConfig" type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core" /> </configSections> <UnicastBusConfig> <MessageEndpointMappings> <!-- the endpoint on which the backplane is listening for commands --> <!-- SignalR will subscribe to new messages via that endpoint --> <add Messages="SignalR.NServiceBus" Endpoint="signalr.nservicebus.backplaneservice" /> </MessageEndpointMappings> </UnicastBusConfig> <MessageForwardingInCaseOfFaultConfig ErrorQueue="error" /> </configuration>
To test this, just create a SignalR web application that uses the backplane. Next, copy the finished application to a second directory and map it as an extra application in IIS. Be sure to give it its own input queue. Start up the backplane process and fire up both website instances. You’ll see that messages sent to/from one website instance, are also delivered to the other.
Conclusion
This was a fun exercise to show how to extend SignalR with your own backplane to support a scale out scenario. The question is how scalable the backplane process itself is. The Azure ServiceBus backplane for example can scale out itself by adding more topics. These are sharded across multiple ServiceBus instances, something not supported in my version. It all depends on how massive your messaging needs are. I have not looked into a way of increasing the topic count (i.e. the StreamCount) in this NServiceBus version for avoiding contention. I’d be interested in ways to do so.
Nevertheless, this could be a nice starting point for using NServiceBus in your application for this purpose.