MassTransit #2: In-memory Mediator pattern basics

Pierre Belin
Pierre Belin
MassTransit #2: In-memory Mediator pattern basics
Table of Contents
Table of Contents

MassTransit is a free, open-source distributed application framework for .NET. This library facilitates service-to-service communication and helps developers build decoupled applications that can scale and manage failure scenarios effectively.

One of its key features is the implementation of the mediator pattern which this article will cover in depth.

The mediator pattern, in the realm of object-oriented programming, encapsulates how a set of objects interact. MassTransit implements this pattern to ease the handling of in-process messaging and improve application design.

By the end of this article, you will have a basic understanding of the mediator pattern within MassTransit.

If you didn't read it, I recommend to read the previous article:

MassTransit #1: A Game-Changer for .NET Messaging
We shall delve into various patterns enabled by MassTransit, furnish examples of situations where it shines, and conclude with its advantages.

Send messages with IMediator

When you use MassTransit's Mediator, you're essentially dispatching a message which is then consumed within the same process. This is a slightly different use case compared to the Send and Publish methods, which are used in interprocess communication scenarios.

To use IMediator, you need to send a request message and then consume it within the same service. This is particularly useful for in-process communication where you want to utilize messaging patterns, without the overhead of external broker communication.

public static async Task<IResult> SubmitGoat(Guid id, IMediator mediator, CancellationToken cancellationToken)
{
    var response = await mediator.SendRequest<GoatSubmitted, GoatAccepted>(new GoatSubmitted(id), cancellationToken);
    // Manage here the response, for example return Results.Ok(response);
}

The SendRequest method on IMediator is used to send a request and wait for a response, which is why it defines the type of message sent and the type returned.

๐Ÿ“ข
It is mandatory that the type returned by the consumer corresponds to that expected by the mediator, otherwise MassTransit will not be able to make the link and will trigger an error.

Handle messages with IConsumer

The mediator pattern is an integral part of MassTransit, streamlining communication between components and keeping them loosely coupled. It allows in-process messaging, giving your objects a communication channel without requiring explicit references to each other.

This interface represents a class that consumes a message of type T. In essence, an IConsumer is a message handler, receiving messages from the message bus and processing them.

public class GoatSubmittedConsumer : IConsumer<GoatSubmitted>
{
    public async Task Consume(ConsumeContext<GoatSubmitted> context)
    {
        var goat = context.Message;
        // here you can add more logic to handle the order
        await context.RespondAsync<GoatAccepted>(new GoatAccepted());
    }
}

ConsumeContext<T> provide a lot of valuable information and utilities to work with the current message in the consumer's Consume method of which the main ones are:

  1. Message: gives access to all the properties of the message sent via the mediator. The type of the message is T. Typically, this is a record with the information needed to process the request.
  2. CancellationToken: is very useful for forwarding a single token in all asynchronous functions. It belongs not only to the method, but to the overall management of the message in memory. By default, this is the token to forward in async calls, especially if you're using Entity Framework Core.
  3. RespondAsync: in the mediator pattern, it sends a message back to the sender. This is exactly what is required in this pattern. The result of the request is sent via another record through this method.

More globally, it provides a wide array of functionality for working with messages in consumers, to control how messages are sent and received, handle errors, control retry policies, and manage long-running conversations, among other things.

Register consumer in the ServiceProvider

To use consumers, MassTransit provides its own registration in dependency injection via the AddMediator extension method.

๐Ÿ“ข
In our case, we're only using the features of the mediator pattern, so there's no need to load the entire library. If you want to use other functionalities, we recommend using the `AddMassTransit` method.

To declare the pattern elements, all we need to do is register the message consumers. The IMediator implementation is already injected. There are several ways of doing this: one by one, by namespace or by assembly:

services.AddMassTransit(x =>
{
	// To register consumers one by one
    x.AddConsumer<GoatSubmittedConsumer>();
    
    // To register all consumers of a namespace
    x.AddConsumersFromNamespaceContaining(typeof(GoatSubmittedConsumer));
    
    // To register all consumers of an assembly
    x.AddConsumers(typeof(GoatSubmittedConsumer).Assembly);
});

Summary

We've peeled back the layers of MassTransit's Mediator pattern in this exploration, focusing on how its tools, including IMediator and IConsumer, can simplify in-process communication in .NET applications.

The pivotal role of ConsumeContext<T> was also examined, which provides rich functionality for consumer message handling. Importantly, we also learned how to effectively register consumers in MassTransit using AddConsumer.

This exploration should give you a solid foundation to harness MassTransit's power in your .NET applications, encouraging better design and efficient messaging.

Next article:

MassTransit #3: Migrating mediator pattern from MediatR
A well-known implementation of this pattern is found in the MediatR library, a popular mediator implementation in .NET.

To go further:

Mediator
MassTransit includes a mediator implementation, with full support for consumers, handlers, and sagas (including saga state machines). MassTransit Mediator runs in-process and in-memory, no transport is required. For maximum performance, messages are passed by reference, instead than being serializedโ€ฆ

Have a goat day ๐Ÿ



Join the conversation.

Great! Check your inbox and click the link
Great! Next, complete checkout for full access to Goat Review
Welcome back! You've successfully signed in
You've successfully subscribed to Goat Review
Success! Your account is fully activated, you now have access to all content
Success! Your billing info has been updated
Your billing was not updated