A trivial message loop using RabbitMQ in C#.NET

During the May 2011 meeting of the Melbourne ALT.NET group, 3 presenters each with their chosen functional language tackled a basic problem of transmitting a message along a chain, with the objective of all nodes contributing to a final output. Here’s some of their code in the the languages of; Erlang, Scala and F#.

As an introductory post to RabbitMQ for my blog I thought I would cover how you go about setting up a simple RabbitMQ server-client in an asynchronous fashion. In this post I’m only going to cover the abstract concept and introduce some terms, I’ll go into more specific detail in another post as to the technical details of using RabbitMQ. I presented this concept as a very quick 10 minute lightning talk at DDD Melbourne 2011.

So stay tuned for more RabbitMQ based posts (links will appear here when they’re complete).

The objective:

  1. Start with a message that contains a letter ‘A’
  2. Send this message off to a node (N)
  3. Have that node increment the letter, e.g. first node makes it ‘B’
  4. Then that node sends it off, and so on looping for M number of messages.

A Node:

Node Structure

The Algorithm:
My RabbitMQ and C# based solution (simplest version), this list will contain some RabbitMQ specific concepts that I’ll describe later in the post. This is the algorithm to create, wire up and kick off processing.

  1. Create the N clients SimpleRpcClient.
  2. Keep a reference to the 1st client, this is what we use to begin messaging.
  3. Create N Subscription elements.
  4. Create N SimpleRpcServer elements these are the actual nodes.
  5. Supply the second client onwards and subscibtion to the node
  6. Create a new Threading.Tasks.Task() for each node.
  7. To complete the loop, wire up the first client, to the last node.

Node Communication (click for larger view):
Each node, houses a client to continue on sending the message.

Node Communication

The Code:

IConnection connection;
IModel model;

char[] letters; //A-Z, repeating
for (x = 0; x < totalToCreate; x++)
{
    var nextLetter = letters[x];
    //this builds up a string in the format of comm.0.a, comm.0.b, etc
    var exchange = String.Format("comm.{0}.{1}", x, nextLetter); 
    model.ExchangeDeclare(exchange, ExchangeType.Direct);
    var queueName = model.EnsureQueue(exchange);
    subscriptions.Add(new Subscription(model, queueName));

    clients.Add(new SimpleRpcClient(model, queueName));
}

for (x = 0; x < totalToCreate; x++)
{
    //note the use of [x+1] on the clients, 
    server = new SimpleRpcServer(subscriptions[x], clients[x-1]);

    new Task(server.MainLoop).Start(); //MainLoop is a RabbitMQ concept
}

//Inside RpcServer
public override void HandleSimpleCast(bool isRedelivered, IBasicProperties requestProperties, byte[] body)
{
    var messageToSend = body.Deserialize().IncrementLetter();

    rpcClient.Cast(new BasicProperties(), messageToSend);
}

The code above had been simplified a bit more just to demonstrate the concept, please review the project code for further intricacies in it how it needs to operate.

Working demo code on BitBucket – https://bitbucket.org/NickJosevski/rabbitloop

*Note: you’ll have to excuse some of the roughness of the code as it stands at 29th of May, I haven’t had a chance to refactor it to be more elegant.

There is a long list of RabbitMQ concepts that are beyond the scope of this blog post, a new one will follow soon with my explanations of how they work:

  • IConnection
  • IModel
  • QueueHelper.EnsureQueue()
  • HandleSimpleCast
  • rpcClient.Cast()
  • Subscription
  • ExchangeDeclare()

Also check back I may have completed a Prezi on the subject.

Advertisements