Azure service bus sessions

Service bus sessions provide a way to group related messages so that they can be processed together. This can be useful in many scenarios e.g.

  • Process requests from a single client / customer together
  • Multiple messages indicating completed steps of an ongoing process where the receivers responsibility is to process all steps to completion and also update the process state at the end
  • Multiple messages each having part of the information sent to receiving system to be processed

Basically its an easy way of de-multiplexing  a huge message stream into related message streams each to be handled by a single message receiver.

I will take a very simple example to demonstrate the concept where I am going to send 10 messages with the counter as message text and each of these messages are divided into two sessions viz. Even and Odd based on if the message number is even or odd respectively.

 

No Sessions

This is the case where we do not specify any sessions and considering you have a partitioned queue your message processing would look something like below.

ScreenClip

The messages are received in a random order since this is a partitioned queue.

Send messages :

static void SendMessagesWithoutSessionId(int numberOfMessages)
{
var queueClient = QueueClient.CreateFromConnectionString(sbConnectionString, sbQueueName);
for (int i = 0; i < numberOfMessages; i++)
{
var message = new BrokeredMessage("message : " + i);
queueClient.Send(message);
Console.WriteLine("sent : " + i);
}
queueClient.Close();
}

Process messages:

static void SimpleReceive()
{
var queueClient = QueueClient.CreateFromConnectionString(sbConnectionString, sbQueueName);

queueClient.OnMessage(x => ProcessMessage(x));

Console.ReadLine();
queueClient.Close();
}

static void ProcessMessage(BrokeredMessage x)
{

var msg = x.GetBody<string>();
Console.WriteLine("Received : " + msg + " with session id : " + x.SessionId);
Thread.Sleep(1000);
x.Complete();
}

 

With sessions

For enabling sessions we need to specify this while creating the queue and this cannot be modified subsequently.

Create session aware queue:

static QueueClient CreateSessionAwareQueue(string queueName)
{
NamespaceManager nsManager = NamespaceManager.CreateFromConnectionString(sbConnectionString);
QueueDescription qDescription = new QueueDescription(queueName)
{
AutoDeleteOnIdle = TimeSpan.FromMinutes(5),
DuplicateDetectionHistoryTimeWindow = TimeSpan.FromSeconds(60),
RequiresDuplicateDetection = true,
RequiresSession = true
};
if (nsManager.QueueExists(queueName))
{
nsManager.DeleteQueue(queueName);
}
nsManager.CreateQueue(qDescription);
return QueueClient.CreateFromConnectionString(sbConnectionString, queueName);
}

Session are set on each message by setting a session id property of the brokered message. Its simple whichever message you want to be part of same session just set the same session id for them.For simple demo purpose

I am just setting the sessions to even and odd based on the counter.

Sending messages with session :

static void SendMessagesWithSessionId(int numberOfMessages)
{
var queueClient = CreateSessionAwareQueue("sessionqueue");
for (int i = 0; i < numberOfMessages; i++)
{
var message = new BrokeredMessage("message : " + i);

if (i % 2 == 0)
message.SessionId = "Session-Even";
else
message.SessionId = "Session-Odd";

queueClient.Send(message);
Console.WriteLine("sent : " + i);
}
queueClient.Close();
}

While processing there are multiple ways to handle session messages but in my example I will use a session handler which I feel is the most elegant and the one you would be mostly using in your actual projects.

Define a session handler:

public class MessageSessionHandler : IMessageSessionHandler
{
private Guid messageHandlerId = Guid.NewGuid();
public void OnCloseSession(MessageSession session)
{
Console.WriteLine($"Session closed - Handler Id {messageHandlerId}");
}

public void OnMessage(MessageSession session, BrokeredMessage message)
{
var msg = message.GetBody<string>();
Console.WriteLine($"Received : {msg} from session : {message.SessionId} handled by : {messageHandlerId} ");
Thread.Sleep(1000);
message.Complete();
}

public void OnSessionLost(Exception exception)
{
Console.WriteLine($"Session lost - Handler Id {messageHandlerId} due to exception {exception.Message}");
}
}

Register the session handler:

static void ProcessMessageWithSessionHandler()
{
var queueClient = QueueClient.CreateFromConnectionString(sbConnectionString, "sessionqueue");
var sessionOptions = new SessionHandlerOptions()
{
AutoComplete = false,
AutoRenewTimeout = TimeSpan.FromSeconds(30),
MaxConcurrentSessions = 1,
MessageWaitTimeout = TimeSpan.FromSeconds(10)
};

queueClient.RegisterSessionHandler(typeof(MessageSessionHandler), sessionOptions);
}

If you run this code it looks something like below.

ScreenClip

So first one session is acquired and messages from that session is processed and then next one.

Imagine a huge stream of messages with different sessions ids .Now you clearly see how sessions help us de-multiplexing a stream of messages.

But looking at about screenshot a natural questions come to mind .Is it possible to handle multiple sessions in parallel or concurrently ? Answer is …Yes.

 

With concurrent sessions

It is very simple to implement concurrent processing for sessions.If you notice the code snippet I pasted above for registering the session handler.SessionHandlerOptions class has a property called MaxConcurrentSessions which is currently set to 1.

We can simply change this and you would see that your sessions are processed concurrently . What should be the ideal number for this should depends purely on your specific scenario.In this example I’ll just set it to 2 so that both of my sessions can be processed in parallel. Below is the result.

ScreenClip

Just notice that even and odd messages are still handled by separate session handlers indicated by “handled by “ guid but both session handlers are invoked in a separate thread.

Hope this post makes the concept of sessions a little easier to understand.
To know in depth about service bus brokered messaging refer this pluralsight course.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>