Integrating Amazon Simple Queue Service (SQS) with .NET 6 | Exploring asynchronous message handling.

May 15, 2023 |
Views: 400 |

Reading Time:

While working on a microservice architecture system you will definitely come to a point when you need to communicate with other services. You can either call these service:

  1. Synchronously: Calling Rest API of other service and exchange data through JSON response.
  2. Asynchronously: Using a 3rd party message queue services (like Rabbit MQ, Apache Kafka, AWS Simple Queue Service) and handle data when a message request is received.

It is actually ideal to use the asynchronous path because there are many disadvantages for synchronous communication, such as:

  1. If your service URL is changed you need to update all the other services with the new service URL
  2. If your services make chain calls then the response time will grow and that will make your application very unresponsive.

Today I’m going to show you how you can implement asynchronous communication in your .NET 6 service applications. We are going to use Amazon Simple Queue Service, a fully managed message queuing for microservices, distributed systems, and serverless applications. First, we need a queue. You can create one by following this guideline: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/step-create-queue.html

I have create a queue called neuron-blast-queue. Next, let’s open Visual Studio 2022 and create a new Web API project:

Now, Let’s create a Services folder, create an interface IMessageHandler.cs and an implementation class MessageHander.cs. We also need a DTO class MessageResquest.cs which will hold our request information. Declare a method Task HandleMessage(MessageResquest request) in your IMessageHandler interface. Our code will look like this:

// MessageRequest.cs
namespace AwsSqsDemo.Services
{
public class MessageRequest
{
public RequestTypes RequestType { get; set; }
public string RequestData { get; set; } = default!;
}

public enum RequestTypes
{
UserCreation = 1
}
}

// IMessageHandler.cs
namespace AwsSqsDemo.Services
{
public interface IMessageHandler
{
Task HandleMessage(MessageRequest request);
}
}

// MessageHandler.cs
namespace AwsSqsDemo.Services
{
public class MessageHandler : IMessageHandler
{
public async Task HandleMessage(MessageRequest request)
{
throw new NotImplementedException();
}
}
}

Okay, now let’s add AWSSDK.SQS nuget package in our project:

Let’s create a class AwsSqsClient.cs which will handle all our SQS related tasks like sending or receiving messages:

using Amazon;
using Amazon.Runtime;
using Amazon.SQS;
using Amazon.SQS.Model;
using AwsSqsDemo.Services;
using System.Text.Json;

namespace AwsSqsDemo.Utils
{
public class AwsSqsClient
{
private readonly AmazonSQSClient _client;

public AwsSqsClient()
{
// These values as well as other configurations related values should be retrived from appsettings.json, I’ve hardcoded these for demo purpose
var awsCreds = new BasicAWSCredentials(“AccessKey”, “SecretKey”);
_client = new AmazonSQSClient(awsCreds, RegionEndpoint.APSoutheast1);
}

public async Task PublishAsync(MessageRequest messageBody)
{
try
{
var request = new SendMessageRequest()
{
QueueUrl = “neuron-blast-queue-url”,
MessageBody = JsonSerializer.Serialize(messageBody)
};

var response = await _client.SendMessageAsync(request);

if (response.HttpStatusCode == System.Net.HttpStatusCode.OK)
{
Console.WriteLine($”Message added to queue: {response.MessageId}”);
}
else
{
Console.WriteLine($”Message {response.MessageId} was not sent to queue. Please check”);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}

public async Task ReceiveAsync(CancellationToken cancellationToken)
{
var responseObject = new MessageRequest() { RequestType = RequestTypes.Empty };

try
{
var request = new ReceiveMessageRequest()
{
QueueUrl = “neuron-blast-queue-url”,
};

var response = await _client.ReceiveMessageAsync(request, cancellationToken);

if (response.HttpStatusCode == System.Net.HttpStatusCode.OK)
{
var message = response.Messages.FirstOrDefault();
if (message != null)
{
if (!string.IsNullOrEmpty(message.Body)) responseObject = JsonSerializer.Deserialize(message.Body) ?? responseObject;
await _client.DeleteMessageAsync(“neuron-blast-queue-url”, message.ReceiptHandle, cancellationToken);
}
}
else
{
Console.WriteLine($”Message could not be received. Please check”);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}

return responseObject;
}
}
}

Now we need a background job processor which will listen to SQS queue and handle our requests whenever there’s a message available. I’m going to create a class Worker.cs which will inherit a system class BackgroundService (IHostedService implementation). This our code:

using AwsSqsDemo.Services;
using AwsSqsDemo.Utils;

namespace AwsSqsDemo
{
public class Worker : BackgroundService
{
private readonly AwsSqsClient _client;
private readonly IServiceScopeFactory _scopeFactory;

public Worker(IServiceScopeFactory scopeFactory)
{
_client = new AwsSqsClient();
_scopeFactory = scopeFactory;
}

protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var response = await _client.ReceiveAsync(cancellationToken);

try
{
using (var scope = _scopeFactory.CreateScope())
{
var messageHandler = scope.ServiceProvider.GetRequiredService();
await messageHandler.HandleMessage(response);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
}

And this is the code for HandleMessage method:

namespace AwsSqsDemo.Services
{
public class MessageHandler : IMessageHandler
{
public async Task HandleMessage(MessageRequest request)
{
try
{
switch (request.RequestType)
{
case RequestTypes.UserCreation:
//Handle User Creation Logic Here
break;
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}

Now, let’s send a message to the queue. For test purpose I’m sending a message from UserController.cs:

using AwsSqsDemo.Services;
using AwsSqsDemo.Utils;
using Microsoft.AspNetCore.Mvc;
using System.Text.Json;

namespace AwsSqsDemo.Controllers
{
[Route(“api/[controller]”)]
[ApiController]
public class UserController : ControllerBase
{
[HttpPost]
public async Task CreateUser()
{
try
{
var client = new AwsSqsClient();
await client.PublishAsync(new MessageRequest()
{
RequestType = RequestTypes.UserCreation,
RequestData = JsonSerializer.Serialize(new { Name = “Bishal Sarker” })
});
return Ok();
}
catch(Exception ex)
{
return BadRequest(ex.Message);
}
}
}
}

Finally, inject all the services in Program.cs:

That’s all people! Let me know about thoughts in the comments. Thanks.
How to speed up software development with ChatGPT, GitHub Copilot, and Tabnine.

How to speed up software development with ChatGPT, GitHub Copilot, and Tabnine.

AI tools are causing a lot of arguments about what will happen to jobs, education, technology, and software development in the future. Leaders in the tech industry are thinking about the risks to security and the possibility of AI tools breaking the rules of intellectual property when used for coding help. On the other hand, people who have a lot of influence in the tech world on social media sites like Twitter and LinkedIn are talking about how AI-powered tools can make us more productive and trying to guess which jobs will go away first.

read more
What is DevOps and why companies should not ignore its importance?

What is DevOps and why companies should not ignore its importance?

DevOps is gaining popularity at a rapid rate in software companies around the world as it enables companies to create and improve products at a faster rate than they could with traditional approaches. DevOps includes a set of processes that lets development and operations teams work together in completing software development.

read more
What is an MVP and why is it important for your startup?

What is an MVP and why is it important for your startup?

Are you aware that almost 70% of startups can go wrong and fail before it even reaches its final form? There are numerous reasons for this and the most viable one is yet to be determined. So how does an entrepreneur prevent his/her innovative startup from biting the dust?

However, we do not want you to take our word for it. In this article, we are going to discuss what a competitor analysis exactly is and what questions it will help you answer, so you can make an informed decision for yourself. (Trust me though, you will not want to miss it.)

read more
SHARE ON SOCIAL MEDIA