A basic walkthrough of RabbitMQ using C#.NET examples

In my last post “A trivial message loop using RabbitMQ in C#.NET” I demonstrated a simple concept of a circular message queue using RabbitMQ. I chose not to complicate the post with explaining some fundamentals of RabbitMQ leaving that for this post.

As of writing the version of RabbitMQ I am using is 2.4.1.

Some of the concepts I introduced in the last post were, and I’ll break them down in a hopefully a logical flow as steps to take.

  1. IConnection
  2. IModel
  3. ExchangeDeclare()
  4. QueueHelper.EnsureQueue()
  5. Subscription
  6. SimpleRpcClient
  7. SimpleRpcClient.Cast()
  8. Your own SimpleRpcServer
  9. HandleSimpleCast

First if you’re looking to get setup on Windows head on over to the RabbitMQ official documentation and follow the steps.

Also before you continue reading I urge you to go have a look at Mike Hadlow’s EasyNeqQ API and accompanying blog posts.

Some other good blog posts I have stumbled upon after starting to write my version of this come from Simon Dixon, at the time of me writing this he had 1, 2, 3 part series

On to my take, I’ll be explaining RabbitMQ along the lines of how my simple demonstration application functions, as a reminder it’s available on BitBucket – https://bitbucket.org/NickJosevski/rabbitloop

Step 1 – The Connection

I am using ConnectionFactory with a local machine address to create the connection the client and server will use to build the next required component.

IConnection connection = new ConnectionFactory
  { 
      Address = "127.0.0.1" //localhost
  }.CreateConnection();

Step 2 – The Model

Using the connection create a model this is used to configure the rest of details on how communication will take place between the client and server.

IModel model = connection.CreateModel();

Step 3 – ExchangeDeclare()

Declaring an exchange is the concept of setting up a named path for communication, the second parameter offers options on how messages are to be delivered, in this example I’m sticking with a simple choice of ‘Direct’

var exchange = "my.exchange";
model.ExchangeDeclare(exchange, ExchangeType.Direct);

Step 4 – QueueHelper.EnsureQueue()

This is my own helper extension method to help perform a few tasks; creating a queueId, using that queueId to declare a queue on the model, and then bind the queue and exchange together, finally returning the newly created queueName.

public static class QueueHelper
{
    public static string EnsureQueue(this IModel ch, String exchangeName)
    {
        var queueId = String.Format("{0}.reply", exchangeName);
        var queueName = ch.QueueDeclare(queueId, false, false, false, null);
        ch.QueueBind(queueName, exchangeName, "", null);
        return queueName;
    }
}

Step 5 – Subscription

The subscription is supplied to the server logic and is created using the model and the queueName

var sub = Subscription(model, queueName);

Step 6 – SimpleRpcClient
Creating the client is just as simple, and we’re ready to send a message.

var client = new SimpleRpcClient(model, queueName);

Step 7 – SimpleRpcClient.Cast()

The Cast() method on the SimpleRpcClient is the asynchronous way to send a message, I want to keep this post more straight forward so I am excluding the message class and how to serialize it but those steps are very basic.

var myMsg = new SerializableClassYouHaveCreated().Serialize();

rpcClient.Cast(new BasicProperties(), myMsg);

Step 8 – NicksSimpleRpcServer

This is where things start to get a little more involved, but it’s relatively straight forward. In this approach I create my own RpcServer that extends SimpleRpcServer.

public class NicksSimpleRpcServer : SimpleRpcServer
{
    private readonly SimpleRpcClient rpcClient;

    public NicksSimpleRpcServer(Subscription subscription, SimpleRpcClient client)
    : base(subscription)
    {
        rpcClient = client;
    }

}

Step 9 – HandleSimpleCast

All I do now is override the method that handles asynchronous messages (it also needs to de-serialize the content form byte[] body). Just to follow along from my sample application the server also has a client private member so it can continue to forward a modified message on wards.

//as an override method inside NicksSimpleRpcServer:
public override void HandleSimpleCast(Boolean isRedelivered, IBasicProperties requestProperties, byte[] body)
{
    //deal with incoming message, create new message to forward
     rpcClient.Cast(new BasicProperties(), msgToForward);
}

This is where our story comes to an end… As a final step we need to get our server running…

Using the Task Parallel Library create a Task that kicks off another method of the RpcServer – MainLoop(). MainLoop simply sits there (blocks) and accepts incoming messages, each time an asynchronous arrives HandleSimpleCast will fire and the message will be processed.

var server = new NicksSimpleRpcServer(sub, client);

new Task(server.MainLoop).Start();

Hope this is easy to follow.

A trivial message loop using RabbitMQ in C#.NET

During the May 2011 meeting of the Melbourne ALT.NET group, 3 presenters each with their chosen functional language tackled a basic problem of transmitting a message along a chain, with the objective of all nodes contributing to a final output. Here’s some of their code in the the languages of; Erlang, Scala and F#.

As an introductory post to RabbitMQ for my blog I thought I would cover how you go about setting up a simple RabbitMQ server-client in an asynchronous fashion. In this post I’m only going to cover the abstract concept and introduce some terms, I’ll go into more specific detail in another post as to the technical details of using RabbitMQ. I presented this concept as a very quick 10 minute lightning talk at DDD Melbourne 2011.

So stay tuned for more RabbitMQ based posts (links will appear here when they’re complete).

The objective:

  1. Start with a message that contains a letter ‘A’
  2. Send this message off to a node (N)
  3. Have that node increment the letter, e.g. first node makes it ‘B’
  4. Then that node sends it off, and so on looping for M number of messages.

A Node:

Node Structure

The Algorithm:
My RabbitMQ and C# based solution (simplest version), this list will contain some RabbitMQ specific concepts that I’ll describe later in the post. This is the algorithm to create, wire up and kick off processing.

  1. Create the N clients SimpleRpcClient.
  2. Keep a reference to the 1st client, this is what we use to begin messaging.
  3. Create N Subscription elements.
  4. Create N SimpleRpcServer elements these are the actual nodes.
  5. Supply the second client onwards and subscibtion to the node
  6. Create a new Threading.Tasks.Task() for each node.
  7. To complete the loop, wire up the first client, to the last node.

Node Communication (click for larger view):
Each node, houses a client to continue on sending the message.

Node Communication

The Code:

IConnection connection;
IModel model;

char[] letters; //A-Z, repeating
for (x = 0; x < totalToCreate; x++)
{
    var nextLetter = letters[x];
    //this builds up a string in the format of comm.0.a, comm.0.b, etc
    var exchange = String.Format("comm.{0}.{1}", x, nextLetter); 
    model.ExchangeDeclare(exchange, ExchangeType.Direct);
    var queueName = model.EnsureQueue(exchange);
    subscriptions.Add(new Subscription(model, queueName));

    clients.Add(new SimpleRpcClient(model, queueName));
}

for (x = 0; x < totalToCreate; x++)
{
    //note the use of [x+1] on the clients, 
    server = new SimpleRpcServer(subscriptions[x], clients[x-1]);

    new Task(server.MainLoop).Start(); //MainLoop is a RabbitMQ concept
}

//Inside RpcServer
public override void HandleSimpleCast(bool isRedelivered, IBasicProperties requestProperties, byte[] body)
{
    var messageToSend = body.Deserialize().IncrementLetter();

    rpcClient.Cast(new BasicProperties(), messageToSend);
}

The code above had been simplified a bit more just to demonstrate the concept, please review the project code for further intricacies in it how it needs to operate.

Working demo code on BitBucket – https://bitbucket.org/NickJosevski/rabbitloop

*Note: you’ll have to excuse some of the roughness of the code as it stands at 29th of May, I haven’t had a chance to refactor it to be more elegant.

There is a long list of RabbitMQ concepts that are beyond the scope of this blog post, a new one will follow soon with my explanations of how they work:

  • IConnection
  • IModel
  • QueueHelper.EnsureQueue()
  • HandleSimpleCast
  • rpcClient.Cast()
  • Subscription
  • ExchangeDeclare()

Also check back I may have completed a Prezi on the subject.