Using azure storage queues
Azure offers two kinds of queuing mechanism viz. storage queues and service bus queues. In this blog post, I’ll explain how to use storage queues for sending and receiving messages.
For details on the difference between storage and service bus queues please refer this msdn documentation. It also gives guidance on when to use what.
Sending messages
In the sample code below, we are sending 10 messages to a storage queue.
public class FileUploadMessage { public string FilePath { get; set; } public int FileId { get; set; } } public class AzureStorageQueueSender { public class FileUploadMessage { public string FilePath { get; set; } public int FileId { get; set; } } public async Task SendMessagesAsync() { var storageAccount = CloudStorageAccount.Parse("UseDevelopmentStorage=true"); var queueClient = storageAccount.CreateCloudQueueClient(); var queueRef = queueClient.GetQueueReference("filequeue"); await queueRef.CreateIfNotExistsAsync(); for (int i = 0; i < 10; i++) { //simulate some processing by putting a delay while sending messages await Task.Delay(200 * i); var uploadMessage = new FileUploadMessage() { FileId = i, FilePath = "Something" }; var message = new CloudQueueMessage(JsonConvert.SerializeObject(uploadMessage)); await queueRef.AddMessageAsync(message); } } }
Code is self-explanatory. The only thing worth mentioning is that ‘CloudQueueMessage’ class accepts only a string argument so if you have a complex object then you need to serialize it and send.
Receiving messages
Below is the sample code for receiving messages.
public class FileUploadMessage { public string FilePath { get; set; } public int FileId { get; set; } } public class AzureStorageQueueReceiver { public async Task ReceiveMessagesAsync() { var storageAccount = CloudStorageAccount.Parse("UseDevelopmentStorage=true"); var queueClient = storageAccount.CreateCloudQueueClient(); var queueRef = queueClient.GetQueueReference("filequeue"); await queueRef.CreateIfNotExistsAsync(); while (true) { await queueRef.FetchAttributesAsync(); if (queueRef.ApproximateMessageCount > 0) { var message = await queueRef.GetMessageAsync(); //set the next visibility timeout await queueRef.UpdateMessageAsync(message, TimeSpan.FromSeconds(15), Microsoft.WindowsAzure.Storage.Queue.MessageUpdateFields.Content | Microsoft.WindowsAzure.Storage.Queue.MessageUpdateFields.Visibility); if (message == null) continue; var uploadMessage = JsonConvert.DeserializeObject<FileUploadMessage>(message.AsString); Console.WriteLine($"Received message : {message.AsString}"); //Delete the message after successfully processing it await queueRef.DeleteMessageAsync(message); } else { Console.WriteLine("No message on the queue"); await Task.Delay(1000); } } } }
The important point here is to understand how the message handling happens. Every time you pull a message from the queue you get the lease on the message for a specified time (by default it is 30 seconds).If you are not able to process the message during this time you should extend the lease else the message will appear again on the queue and would be available for processing. Once the message processing is successful the message should be deleted so that it does not reappear on the queue.
In the sample code we pull the message from the queue and in next line we update the visibility time out (i.e the time after which the message will reappear for processing on the queue based on how much time we need for processing).
Once the message has been processed successfully we delete the message. In case an exception occurs the message will automatically reappear after 45 seconds for reprocessing.
In this post, I showed a very basic example of sending and receiving using storage queues. In future posts, I’ll be writing more on specific aspects of working with storage queues. To know in-depth on azure storage accounts (i.e Azure tables ,blobs and queues) here is a very good course by alan smith.