EasyNetQ

Pub/Sub messaging using EasyNetQ

In my previous RabbitMQ blog post, we explored implementing the pub/sub messaging model with RabbitMQ to enhance communication in applications. We discovered that this process entails tasks such as managing connections, serializing/deserializing messages, enabling microservices to subscribe to topics, binding and configuring queues, and addressing redundant code. To simplify these tasks, we can use EasyNetQ, a .NET Client API that acts as a wrapper over the native RabbitMQ client. EasyNetQ simplifies development and cuts down on the complexity of using RabbitMQ directly. This blog post will focus on EasyNetQ's benefits over standard RabbitMQ and guide you through its implementation.

Key Features of EasyNetQ

  1. Simplified API: Offers a more user-friendly API than RabbitMQ's .NET client, easing common tasks like message publishing and subscribing.

  2. Automatic Serialization/Deserialization: Automatically converts messages to and from JSON, eliminating the need for extra coding.

  3. Easy Configuration: Streamlines the RabbitMQ setup process, great for beginners or those who want a quick configuration.

  4. Support for Advanced Patterns: Accommodates complex messaging patterns, including RPC and topic-based publishing, with little configuration.

  5. Robust Error Handling and Resilience: Features like automatic reconnection in the event of a connection failure, which enhances the stability of your messaging system.

  6. Asynchronous Programming Support: Designed for modern .NET async programming, aiding in building scalable, efficient applications.

  7. Active Community and Documentation: Benefits from an active community and comprehensive documentation.

  8. Reduced Complexity: By abstracting the complexities of RabbitMQ, EasyNetQ allows developers to focus more on business logic rather than the intricacies of the messaging infrastructure.

Let's see how we can implement pub/sub messaging using EasyNetQ

EasyNetQ follow two simple conventions:

  1. Messages should be represented by .NET types.
  2. Messages should be routed by their .NET type.
  1. Install EasyNetQ

dotnet add package EasyNetQ

  1. Create instance of IBus to connect to RabbitMQ

var bus = RabbitHutch.CreateBus("host=localhost")

The recommended approach is to maintain a single IBus instance throughout the lifespan of application and dispose it on application closure.

  1. Create instance of Message type User:

Message is nothing but .Net public class.

1 // User type Message
2 public class User
3 {
4 public string Name {get; set;}
5 public string Email {get; set;}
6 public int Age {get; set;}
7 }
8
9 // Instance of User Message
10 var message = new User{
11 Name = "Sejal Shah",
12 Email = "sejal.shah123@gmail.com",
13 Age = 20
14 }
  1. Publish User Added Message using Publish method on IBus
1 await bus.PubSub.PublishAsync(message)

The publisher finishes its task once it sends the message and doesn't need to know who is subscribing. The subscriber just has to listen for the message type.

  1. Subscribe to User Message type

An EasyNetQ subscriber focuses on a particular type of message, based on the .NET message class. When you subscribe to a message type using the Subscribe method, it creates a persistent queue in RabbitMQ. This queue collects messages of that type. RabbitMQ then sends these messages to the subscriber whenever it's connected.

1
2 // subscribe for the message of User Type
3 bus.PubSub.Subscribe<User>("user_subscription_id", UserAddedHandler);
4
5 // Perform action on message received
6 public void UserAddedHandler(User msg)
7 {
8 Console.WriteLine($"{msg.Name} user added.");
9 }

It's important for the SubscriptionId to be distinct. EasyNetQ creates a unique queue in RabbitMQ for every different combination of message type and SubscriptionId. Using the Subscribe method initiates a new queue consumer. Subscribing to the same message type using the same SubscriptionId more than once leads to the creation of two individual queues, each for a separate consumer.

Scaling can be effortlessly achieved without additional configuration by either launching multiple instances of the subscriber application or having several subscribers. RabbitMQ will then distribute the messages to each consumer one after another in a round-robin manner.

UserAddedHandler is the subscribe callback delegate. When messages arrive from queues subscribed through EasyNetQ, they go into an in-memory queue. There's a single thread that continuously takes messages from this queue and processes them using Action delegates. Because these delegates are handled one by one on a single thread, it's best to avoid long, synchronous IO operations.

Rather than using Subscribe, we can opt for SubscribeAsync. This method enables the delegate to return a Task instantly and then carry out long-running IO operations asynchronously. The subscriber acknowledges the message only after the Task is complete, not merely when the Task is returned from the handler.

In this way we can integrate pub/sub messages in our application using EasyNetQ.

References:

  1. EasyNetQ Publish
  2. EasyNetQ Subscribe
  3. Ploymorphic Publish Subscribe