A trivial message loop using RabbitMQ in C#.NET

During the May 2011 meeting of the Melbourne ALT.NET group, 3 presenters each with their chosen functional language tackled a basic problem of transmitting a message along a chain, with the objective of all nodes contributing to a final output. Here’s some of their code in the the languages of; Erlang, Scala and F#.

As an introductory post to RabbitMQ for my blog I thought I would cover how you go about setting up a simple RabbitMQ server-client in an asynchronous fashion. In this post I’m only going to cover the abstract concept and introduce some terms, I’ll go into more specific detail in another post as to the technical details of using RabbitMQ. I presented this concept as a very quick 10 minute lightning talk at DDD Melbourne 2011.

So stay tuned for more RabbitMQ based posts (links will appear here when they’re complete).

The objective:

  1. Start with a message that contains a letter ‘A’
  2. Send this message off to a node (N)
  3. Have that node increment the letter, e.g. first node makes it ‘B’
  4. Then that node sends it off, and so on looping for M number of messages.

A Node:

Node Structure

The Algorithm:
My RabbitMQ and C# based solution (simplest version), this list will contain some RabbitMQ specific concepts that I’ll describe later in the post. This is the algorithm to create, wire up and kick off processing.

  1. Create the N clients SimpleRpcClient.
  2. Keep a reference to the 1st client, this is what we use to begin messaging.
  3. Create N Subscription elements.
  4. Create N SimpleRpcServer elements these are the actual nodes.
  5. Supply the second client onwards and subscibtion to the node
  6. Create a new Threading.Tasks.Task() for each node.
  7. To complete the loop, wire up the first client, to the last node.

Node Communication (click for larger view):
Each node, houses a client to continue on sending the message.

Node Communication

The Code:

IConnection connection;
IModel model;

char[] letters; //A-Z, repeating
for (x = 0; x < totalToCreate; x++)
{
    var nextLetter = letters[x];
    //this builds up a string in the format of comm.0.a, comm.0.b, etc
    var exchange = String.Format("comm.{0}.{1}", x, nextLetter); 
    model.ExchangeDeclare(exchange, ExchangeType.Direct);
    var queueName = model.EnsureQueue(exchange);
    subscriptions.Add(new Subscription(model, queueName));

    clients.Add(new SimpleRpcClient(model, queueName));
}

for (x = 0; x < totalToCreate; x++)
{
    //note the use of [x+1] on the clients, 
    server = new SimpleRpcServer(subscriptions[x], clients[x-1]);

    new Task(server.MainLoop).Start(); //MainLoop is a RabbitMQ concept
}

//Inside RpcServer
public override void HandleSimpleCast(bool isRedelivered, IBasicProperties requestProperties, byte[] body)
{
    var messageToSend = body.Deserialize().IncrementLetter();

    rpcClient.Cast(new BasicProperties(), messageToSend);
}

The code above had been simplified a bit more just to demonstrate the concept, please review the project code for further intricacies in it how it needs to operate.

Working demo code on BitBucket – https://bitbucket.org/NickJosevski/rabbitloop

*Note: you’ll have to excuse some of the roughness of the code as it stands at 29th of May, I haven’t had a chance to refactor it to be more elegant.

There is a long list of RabbitMQ concepts that are beyond the scope of this blog post, a new one will follow soon with my explanations of how they work:

  • IConnection
  • IModel
  • QueueHelper.EnsureQueue()
  • HandleSimpleCast
  • rpcClient.Cast()
  • Subscription
  • ExchangeDeclare()

Also check back I may have completed a Prezi on the subject.

Advertisements

3 thoughts on “A trivial message loop using RabbitMQ in C#.NET

  1. […] Tools tutorial UnitTesting UserGroup VisualStudio wcf windows-phone-7 NickJosevski’s .NET BlogA trivial message loop using RabbitMQ in C#.NETAutoSave Form Fields using jQuery – .change(), focusout() and the deferred .when() functionsWorks […]

  2. I downloaded the code from bitbucket, and I think there may be some improvements to be made. First, the code “rpcClient.Cast(new BasicProperties(), msgToForward);” in the server always fires, even on the last run, which means that if the demo is run twice in visual studio, the second time there’s already a message in the queue, and the demo fires off immediately instead of waiting for a keypress.

    I fixed it by adding “model.QueuePurge( queueName );” right after EnsureQueue. Second, the program won’t exit unless all threads are shut down. This can be accomplished by adding “model.Close();connection.Close();” at the end of the Main() method.

    Also I think “if(pos == (run * aToZletters.Count())) exchange = “comm.0.a”;” should use the comparison “(pos == ((totalRuns * aToZletters.Count())-1))” instead. The intention is to loop the clients, I think.

    JR

    • Thanks for the suggestions Jrv.

      It may be quicker if you submit a pull request for those changes, otherwise I’m not sure when I may next get a chance to make those adjusts to that project.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s