Building a High-Performance Message Queue with Redis Streams

Cyril Canovas
Cyril Canovas
Building a High-Performance Message Queue with Redis Streams
Table of Contents
Table of Contents

Beyond Pub/Sub: Implementing Reliable Work Distribution with Redis

When developers reach for Redis as a messaging solution, they typically gravitate toward its Pub/Sub capabilities. While Pub/Sub excels at broadcasting messages to multiple subscribers, it falls short for reliable work distribution scenarios. Messages are transient - any offline subscriber misses messages entirely, and there's no built-in mechanism for ensuring message processing or handling failures.

Redis Streams, introduced in Redis 5.0, offers a compelling alternative that combines durability, reliable delivery, and high performance. The RedisRoundRobinQueue implementation showcased here demonstrates how to leverage this feature for building robust distributed work queues.

Message Publishing Implementation

The implementation begins with a clean interface for message publishing:

public class RedisRoundRobinQueue : IDisposable, IAsyncDisposable
{
    private readonly ConnectionMultiplexer _redis;
    private readonly IDatabase _db;
    private readonly string _streamKey;
    private readonly string _consumerGroup;
    private bool _initialized;
    
    public async Task PublishMessage(Dictionary<string, string> message)
    {
        await CheckIfInitialized();
        var values = new NameValueEntry[message.Count];
        int i = 0;
        foreach (var kvp in message)
        {
            values[i] = new NameValueEntry(kvp.Key, kvp.Value);
            i++;
        }
        _ = await _db.StreamAddAsync(_streamKey, values);
    }
}

Consumer Groups Architecture

The core architecture revolves around Redis Stream's consumer groups. Unlike basic streams where each consumer maintains its own position, consumer groups enable distributed message processing with exactly-once delivery guarantees. The initialization logic handles both stream and consumer group creation:

private async Task InitializeStreamAndGroup()
{
    try
    {
        await _db.StreamCreateConsumerGroupAsync(_streamKey, _consumerGroup, "0-0", createStream: true);
    }
    catch (RedisException ex) when (ex.Message.Contains(CstBusyGroup))
    {
        // Group already exists, ignore
        _logger?.LogWarning($"{ex.Message}");
    }
}

Key Optimizations

Message consumption implements several key optimizations:

  1. Prefetching: Rather than reading one message at a time, the implementation fetches batches of 16 messages (PrefetchCount) to reduce Redis round-trips:
var streamEntries = await _db.StreamReadGroupAsync(
    _streamKey,
    groupName: _consumerGroup,
    consumerName: consumerId,
    position: ">",
    count: PrefetchCount);
  1. Batch Acknowledgments: Messages are acknowledged and deleted in batches rather than individually:
if (ids.Count > 0)
{
    var arIds = ids.ToArray();
    _ = await _db.StreamAcknowledgeAsync(_streamKey, _consumerGroup, arIds);
    _ = await _db.StreamDeleteAsync(_streamKey, arIds);
}
  1. Failure Recovery: The implementation includes sophisticated error handling through the ClaimPendingMessages mechanism:
public async Task ClaimPendingMessages(string consumerId, TimeSpan idleTime)
{
    await CheckIfInitialized();
    var idleTimeInMs = (long)idleTime.TotalMilliseconds;
    var pending = await _db.StreamPendingMessagesAsync(_streamKey, _consumerGroup, 10, consumerId);
    
    foreach (var message in pending)
    {
        if (message.IdleTimeInMilliseconds >= idleTimeInMs)
        {
            await _db.StreamClaimAsync(_streamKey, _consumerGroup, consumerId, 
                minIdleTimeInMs: idleTimeInMs, messageIds: new[] { message.MessageId });
        }
    }
}

Resource Management

The implementation includes robust resource management through both synchronous and asynchronous disposal patterns:

public async ValueTask DisposeAsync()
{
    await DisposeAsyncCore();
    GC.SuppressFinalize(this);
}

protected virtual async ValueTask DisposeAsyncCore()
{
    ReleaseUnmanagedResources();
    await _redis.DisposeAsync();
}

Performance Testing

Real-world performance testing demonstrates impressive throughput. The included benchmark program tests with 16 publishers and 8 subscribers processing a million messages:

var messageCount = 1000000;
int publisherCount = 16;
var publishers = Enumerable.Range(1, publisherCount).Select(
    x =>
    {
        return Task.Run(async () =>
        {
            var currentId = 0L;
            while ((currentId = Interlocked.Increment(ref idx)) <= messageCount)
            {
                await queue.PublishMessage(new Dictionary<string, string>
                {
                    { "type", "order" },
                    { "id", $"{currentId:D9}" },
                    { "amount", "100.00" }
                });
            }
        });
    }).ToArray();

The subscriber implementation demonstrates concurrent message processing:

int subscriberCount = 8;
var subscribers = Enumerable.Range(1, subscriberCount).Select(
    x =>
        queue.StartSubscriber($"consumer{x}", (message) =>
        {
            var id = int.Parse(message["id"]);
            var stopped = id >= messageCount;
            return Task.FromResult(stopped);
        })).ToArray();

This implementation achieves several thousand messages per second in throughput, showcasing Redis Streams' capability to handle high-load scenarios efficiently. The combination of batch operations, prefetching, and concurrent processing enables this performance while maintaining strong delivery guarantees.

Key Advantages

Key advantages of this implementation over traditional Redis Pub/Sub include:

  • Guaranteed message delivery with exactly-once processing
  • Automatic work distribution across multiple consumers
  • Message persistence and failure recovery
  • Efficient batch processing and acknowledgment
  • Clean resource management through modern disposal patterns

This pattern is particularly valuable for scenarios requiring reliable work distribution, such as:

  • Order processing systems
  • Task distribution in microservices architectures
  • Event processing pipelines
  • Background job queues

The implementation demonstrates how Redis Streams can serve as a foundation for building robust, high-performance message queues without additional infrastructure beyond your existing Redis deployment.

Have a goat day 🐐



Join the conversation.

Great! Check your inbox and click the link
Great! Next, complete checkout for full access to Goat Review
Welcome back! You've successfully signed in
You've successfully subscribed to Goat Review
Success! Your account is fully activated, you now have access to all content
Success! Your billing info has been updated
Your billing was not updated