MassTransit #3: Migrating mediator pattern from MediatR

In today's complex software landscape, effective and maintainable design patterns play a crucial role in ensuring robust and scalable systems. One such pattern is the Mediator Pattern, which defines how different components of a system interact with each other, thereby reducing the dependencies between them. A well-known implementation of this pattern is found in the MediatR library, a popular mediator implementation in .NET.

In this article, we will explore a design that draws inspiration from the MediatR mediator pattern within the context of MassTransit.

If you didn't read it, I recommend you to start with the previous article:

MassTransit #2: In-memory Mediator pattern basics
By the end of this article, you will have a basic understanding of the mediator pattern within MassTransit.

Configure queries and commands

One main difference about the mediator pattern is the MediatR describes the return of querys and commands, which is not the case for MassTransit, which only works with a notion of consumer.

The first step is to put interfaces on our querys and commands to directly define their returns when the type is declared, rather than when it is consumed.

public interface IRequest {}

public interface IRequest<TOutput> : IRequest, Request<Result<TOutput, IErrorReason>> where TOutput : class {}

public interface IQuery<TOutput> : IRequest<TOutput> where TOutput : class {}

public interface ICommand<TOutput> : IRequest<TOutput> where TOutput : class {}

Each message will then inherit the correct type, either query or command.

public record GetSitesQuery : IQuery<IEnumerable<GetSiteResult>>;

Abstract handlers

According to the MassTransit documentation, the IRequestHandler serves as a contract for request handling in a message-based system. This concept is akin to MediatR's handlers, where individual handlers are designed to manage specific requests.

public interface IRequestHandler<T> : IRequestHandler
    where T : IRequest
{
    Task<object> Consume(T request, CancellationToken cancellationToken);
}

Next, you have an abstract class named RequestHandler, which implements the above interface. It provides a blueprint for request consumption and ensures a consistent structure for handling various types of requests.

This architecture is also aligned with the MediatR pattern, where handlers are implemented based on the contracts defined by interfaces.

public abstract class RequestHandler<T, TOutput> : IRequestHandler<T>
    where T : class, IRequest
    where TOutput : class
{
    public async Task<object> Consume(T request, CancellationToken cancellationToken)
    {
        return await Handle(request, cancellationToken);
    }

    public async Task<object> Consume(IRequest request, CancellationToken cancellationToken)
    {
        return await Handle((request as T)!, cancellationToken);
    }

    protected abstract Task<Result<TOutput, IErrorReason>> Handle(T request, CancellationToken cancellationToken);
}

In MediatR, two primary types of messages are handled: Queries (which return a result) and Commands (which perform an action but don't necessarily return a value). In the given code, you have distinguished between these two types by creating two abstract classes: QueryHandler and CommandHandler.

  • Query: Represents a request for information and typically returns a result.
  • Command: Represents a request to perform an action and may or may not return a result.
💡
In our case, the handle of both is identical. The only difference is in the naming according to the use case, to enable a distinction to be made later if necessary.

These are encapsulated in the following code:

public abstract class QueryHandler<TQuery,TOutput> : RequestHandler<TQuery, TOutput>
    where TQuery : class, IQuery<TOutput> 
    where TOutput : class
{

}

public abstract class CommandHandler<TCommand, TOutput> : RequestHandler<TCommand, TOutput>
    where TCommand : class, ICommand<TOutput>
    where TOutput : class
{
}

The implementation is quite simple. GetGoatQueryHandler is a specific QueryHandler class that handles a query to retrieve information about a goat. In this code, the request is processed to obtain a GoatResult.

public sealed class GetGoatQueryHandler : QueryHandler<GetGoatQuery, GoatResult>
{
    protected override async Task<Result<GetGoatResult, IErrorReason>> Handle(GetGoatQuery request, CancellationToken cancellationToken)
    {
        //
    }
}

By extending the abstract QueryHandler, it provides concrete implementation for the method Handle, where the logic to fetch the desired goat information resides.

The parallel with MediatR is evident in the clear separation of query handling logic, allowing for a focused and maintainable approach to data retrieval.

Dispatch inside the mediator

The Dispatcher pattern centralizes request dispatching, directing the requests to the appropriate handlers. In MediatR, this pattern is also present, allowing various handlers to manage specific types of requests.

public interface IDispatcher
{
    Task<Result<TOutput, IErrorReason>> Send<TOutput>(IRequest<TOutput> request, CancellationToken cancellationToken = default) where TOutput : class;
}

MassTransitDispatcher serves as the core component responsible for sending the requests to appropriate handlers, drawing a parallel with MediatR's ability to dispatch requests to their respective handlers.

public class MassTransitDispatcher : IDispatcher
{
    private readonly IMediator _mediator;

    public MassTransitDispatcher(IMediator mediator)
    {
        _mediator = mediator;
    }

    public Task<Result<TOutput, IErrorReason>> Send<TOutput>(IRequest<TOutput> request, CancellationToken cancellationToken = default) where TOutput : class
    {
        return _mediator.SendRequest(request, cancellationToken);
    }
}

By utilizing MassTransit's IMediator, it facilitates the process of sending requests within MassTransit, aligning the concepts of MediatR's dispatcher with MassTransit's robust messaging system.

Finally, inject the dispatcher directly into the class that will use the mediator model and send the correct query as a parameter. The response is exactly as defined in the IQuery interface.

public class GoatHandlers
{
    private readonly IDispatcher _dispatcher;

    public GoatHandlers(IDispatcher dispatcher)
    {
        _dispatcher = dispatcher;
    }

    public async Task<IResult> GetGoat([FromQuery] int goatId, CancellationToken cancellationToken)
    {
        var response = await _dispatcher.Send(new GetGoatQuery(goatId), cancellationToken);
        return response.Match(
            Results.Ok(),
            Results.BadRequest());
    }
}

This class illustrates how MediatR's concept of handlers can be encapsulated within a domain-specific context in MassTransit, keeping the code modular and aligned with the business domain.

Moving from in-process to out-of-process messaging

I'd like to make an obligatory disclaimer to know before embarking on library replacement.

Unlike MediatR, which is an in-process messaging system, MassTransit is an out-process messaging system.

It will have serious implications for the overall design of the application, and will require reflection that goes beyond the simple replacement of lines of code, particularly with regard to asynchronism management.

The problem arises when you trigger messages that can be processed by several consumers at the same time:

  • MediatR: messages are processed one after the other
  • MassTransit: all messages are processed at the same time for all consumers.

This may seem like a small detail, but it's not all that insignificant.
Some layers, such as Entity Framework's DbContext layer, don't manage asynchronous context access, which can create side-effects requiring the addition of an extra data layer to deal with the problem.

Summary

In conclusion, MassTransit provides a seamless implementation of the Mediator Pattern, making it a powerful choice for building complex and distributed systems. The framework's alignment with MediatR's design philosophy allows for easy adaptation and efficient handling of queries and commands.

By centralizing request dispatching, MassTransit simplifies the development process and enables the creation of modular and domain-specific components. This integration enhances code organization and flexibility, making MassTransit an excellent option for robust and scalable applications.

Next article:

MassTransit #4: Customizing middleware with Filters
MassTransit, highlighting the pivotal role of Filters. If you’re an advanced user, this deep-dive is crafted especially for you!

To go further:

Mediator
MassTransit includes a mediator implementation, with full support for consumers, handlers, and sagas (including saga state machines). MassTransit Mediator runs in-process and in-memory, no transport is required. For maximum performance, messages are passed by reference, instead than being serialized…

Have a goat day 🐐

Most of the article's content comes from Rémi Henache's work on one of our customer projects.