MassTransit #4: Customizing middleware with Filters

Today, we venture deep into the intricacies of the Mediator Pattern in MassTransit, highlighting the pivotal role of Filters. If you're an advanced user, this deep-dive is crafted especially for you!

In the vast realm of software design patterns, the Mediator Pattern serves as a cornerstone. It promotes a reduced coupling between classes by ensuring that instead of classes communicating directly with each other, they interact through a mediator.

If you didn't read it, I recommend you to start with the previous article:

MassTransit #3: Migrating mediator pattern from MediatR
A well-known implementation of this pattern is found in the MediatR library, a popular mediator implementation in .NET.

Amplify Mediator pattern with IFilter

A mediator with filters, in MassTransit's context, is not just a mediator—it's a mediator on steroids. These filters, when used adeptly, can intercept, alter, or dictate the journey of a message before it reaches its designated consumer. Whether it's for appending functionalities like logging, validation, or even error handling, filters ensure you don’t clutter or compromise the core logic of consumers

Dive a level deeper, and we encounter the IFilter. This plays a pivotal role in MassTransit as it represents a behavior that can be injected into the pipeline. It works as a sort of interceptor, capturing and potentially altering requests or their results as they pass through.

In any middleware or pipeline setup, context isn’t just a buzzword—it’s the essence of execution. MassTransit, with its brilliant design, offers developers two flavors of context: the ConsumeContext and the actual context of the message. While the latter pertains to the message's intrinsic properties and its journey details, ConsumeContext is the overarching context that encapsulates the message, providing details about the environment, headers, and more.

Here's a brief code snippet to elucidate:

public async Task Consume(ConsumeContext<YourMessageType> context)
{
    var messageHeader = context.Headers.Get<string>("YourHeaderName");
    var actualMessage = context.Message;
    // Further processing...
}

In the snippet, ConsumeContext provides access to both the message and its associated headers, allowing for nuanced consumption based on header details or other context properties.

Just below, we'll see the link to the filters.

Implement IFilter easily

What’s intriguing about filters in MassTransit is their generic nature. They can be applied to any message type. But let’s not forget the magic of specificity! Filters can be tailored for specific messages, as illustrated below:

public class ExceptionBehaviorFilter<T> :
    IFilter<ConsumeContext<T>>
    where T : class
{
    public ExceptionBehaviorFilter() { }

    public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
    {
        try
        {
            await next(context);
        }
        catch(Exception ex)
        {
            // Handle here the exception, for example log and return a specific Results.BadRequest() is your application returns HttpResponse
            _logger.LogError(ex.Message);
            await context.RespondWithError(new InternalError());
        }
    }
}

This code depicts a generic behavior filter tailored to manage exceptions. Here we can see the 2 parameters of the filters when they are called:

  • Context, which contains all the request information (as seen above)
  • Next, which accesses the next element in the pipe from this filter

On the flip side, we can also introduce specific behaviors, for instance, an authorization filters as:

public class AuthorizationBehaviorFilter<T> :
    IFilter<ConsumeContext<T>>
    where T : class
{
    public AuthorizationBehaviorFilter() { }

    public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
    {
        if (UserIsAuthentified())
            await next(request, ct);

        await context.RespondWithError(new Forbidden());
    }
}

While working with MassTransit, the order in which services are defined has a direct impact on their execution sequence. In the given code:

builder.Services.AddMediator(x =>
{
    x.ConfigureMediator((context, cfg) =>
    {
        cfg.UseConsumeFilter(typeof(ExceptionBehaviorFilter<>), context);
        cfg.UseConsumeFilter(typeof(AuthorizationBehaviorFilter<>), context);
    });
});

The ExceptionBehaviorFilter is defined before the AuthorizationBehaviorFilter to ensures exception handling is prioritized over authorization.

As a result, the exception behavior will be executed before the authorization behavior. This is how the hierarchy between filters is created. When developing, it's very important to catch all errors to prevent the application from crashing and stopping working.

To make this possible, the ExceptionBehaviorFilter is the first filter to be declared, so that it encompasses all subsequent filters. If ever another filter causes an error, it will be caught.

Summary

To wrap up, filters, though an advanced feature within MassTransit, offer unparalleled control and versatility.

Whether it's ensuring exception handling, logging, or nuanced message processing based on context, they’re a tool every advanced MassTransit user should master.

To go further:

Middleware
MassTransit is built using a network of pipes and filters to dispatch messages. A pipe is composed of a series of filters, each of which is a key atom and are described below.

Have a goat day 🐐