Pub/Sub messaging using EasyNetQ
In my previous RabbitMQ blog post, we explored implementing the pub/sub messaging model with RabbitMQ to enhance communication in applications. We discovered that this process entails tasks such as managing connections, serializing/deserializing messages, enabling microservices to subscribe to topics, binding and configuring queues, and addressing redundant code. To simplify these tasks, we can use EasyNetQ, a .NET Client API that acts as a wrapper over the native RabbitMQ client. EasyNetQ simplifies development and cuts down on the complexity of using RabbitMQ directly. This blog post will focus on EasyNetQ's benefits over standard RabbitMQ and guide you through its implementation.
Key Features of EasyNetQ
-
Simplified API: Offers a more user-friendly API than RabbitMQ's .NET client, easing common tasks like message publishing and subscribing.
-
Automatic Serialization/Deserialization: Automatically converts messages to and from JSON, eliminating the need for extra coding.
-
Easy Configuration: Streamlines the RabbitMQ setup process, great for beginners or those who want a quick configuration.
-
Support for Advanced Patterns: Accommodates complex messaging patterns, including RPC and topic-based publishing, with little configuration.
-
Robust Error Handling and Resilience: Features like automatic reconnection in the event of a connection failure, which enhances the stability of your messaging system.
-
Asynchronous Programming Support: Designed for modern .NET async programming, aiding in building scalable, efficient applications.
-
Active Community and Documentation: Benefits from an active community and comprehensive documentation.
-
Reduced Complexity: By abstracting the complexities of RabbitMQ, EasyNetQ allows developers to focus more on business logic rather than the intricacies of the messaging infrastructure.
Let's see how we can implement pub/sub messaging using EasyNetQ
EasyNetQ follow two simple conventions:
- Messages should be represented by .NET types.
- Messages should be routed by their .NET type.
- Install EasyNetQ
dotnet add package EasyNetQ
- Create instance of IBus to connect to RabbitMQ
var bus = RabbitHutch.CreateBus("host=localhost")
The recommended approach is to maintain a single IBus
instance throughout the lifespan of application and dispose it on application closure.
- Create instance of Message type User:
Message is nothing but .Net public class.
- Publish
User Added
Message using Publish method onIBus
The publisher finishes its task once it sends the message and doesn't need to know who is subscribing. The subscriber just has to listen for the message type.
- Subscribe to User Message type
An EasyNetQ subscriber focuses on a particular type of message, based on the .NET message class. When you subscribe to a message type using the Subscribe method, it creates a persistent queue in RabbitMQ. This queue collects messages of that type. RabbitMQ then sends these messages to the subscriber whenever it's connected.
It's important for the SubscriptionId to be distinct. EasyNetQ creates a unique queue in RabbitMQ for every different combination of message type and SubscriptionId. Using the Subscribe method initiates a new queue consumer. Subscribing to the same message type using the same SubscriptionId more than once leads to the creation of two individual queues, each for a separate consumer.
Scaling can be effortlessly achieved without additional configuration by either launching multiple instances of the subscriber application or having several subscribers. RabbitMQ will then distribute the messages to each consumer one after another in a round-robin manner.
UserAddedHandler
is the subscribe callback delegate. When messages arrive from queues subscribed through EasyNetQ, they go into an in-memory queue. There's a single thread that continuously takes messages from this queue and processes them using Action delegates. Because these delegates are handled one by one on a single thread, it's best to avoid long, synchronous IO operations.
Rather than using Subscribe
, we can opt for SubscribeAsync
. This method enables the delegate to return a Task instantly and then carry out long-running IO operations asynchronously. The subscriber acknowledges the message only after the Task is complete, not merely when the Task is returned from the handler.
In this way we can integrate pub/sub messages in our application using EasyNetQ.