Beyond Pub/Sub: Implementing Reliable Work Distribution with Redis
When developers reach for Redis as a messaging solution, they typically gravitate toward its Pub/Sub capabilities. While Pub/Sub excels at broadcasting messages to multiple subscribers, it falls short for reliable work distribution scenarios. Messages are transient - any offline subscriber misses messages entirely, and there's no built-in mechanism for ensuring message processing or handling failures.
Redis Streams, introduced in Redis 5.0, offers a compelling alternative that combines durability, reliable delivery, and high performance. The RedisRoundRobinQueue
implementation showcased here demonstrates how to leverage this feature for building robust distributed work queues.
Message Publishing Implementation
The implementation begins with a clean interface for message publishing:
public class RedisRoundRobinQueue : IDisposable, IAsyncDisposable
{
private readonly ConnectionMultiplexer _redis;
private readonly IDatabase _db;
private readonly string _streamKey;
private readonly string _consumerGroup;
private bool _initialized;
public async Task PublishMessage(Dictionary<string, string> message)
{
await CheckIfInitialized();
var values = new NameValueEntry[message.Count];
int i = 0;
foreach (var kvp in message)
{
values[i] = new NameValueEntry(kvp.Key, kvp.Value);
i++;
}
_ = await _db.StreamAddAsync(_streamKey, values);
}
}
Consumer Groups Architecture
The core architecture revolves around Redis Stream's consumer groups. Unlike basic streams where each consumer maintains its own position, consumer groups enable distributed message processing with exactly-once delivery guarantees. The initialization logic handles both stream and consumer group creation:
private async Task InitializeStreamAndGroup()
{
try
{
await _db.StreamCreateConsumerGroupAsync(_streamKey, _consumerGroup, "0-0", createStream: true);
}
catch (RedisException ex) when (ex.Message.Contains(CstBusyGroup))
{
// Group already exists, ignore
_logger?.LogWarning($"{ex.Message}");
}
}
Key Optimizations
Message consumption implements several key optimizations:
- Prefetching: Rather than reading one message at a time, the implementation fetches batches of 16 messages (
PrefetchCount
) to reduce Redis round-trips:
var streamEntries = await _db.StreamReadGroupAsync(
_streamKey,
groupName: _consumerGroup,
consumerName: consumerId,
position: ">",
count: PrefetchCount);
- Batch Acknowledgments: Messages are acknowledged and deleted in batches rather than individually:
if (ids.Count > 0)
{
var arIds = ids.ToArray();
_ = await _db.StreamAcknowledgeAsync(_streamKey, _consumerGroup, arIds);
_ = await _db.StreamDeleteAsync(_streamKey, arIds);
}
- Failure Recovery: The implementation includes sophisticated error handling through the
ClaimPendingMessages
mechanism:
public async Task ClaimPendingMessages(string consumerId, TimeSpan idleTime)
{
await CheckIfInitialized();
var idleTimeInMs = (long)idleTime.TotalMilliseconds;
var pending = await _db.StreamPendingMessagesAsync(_streamKey, _consumerGroup, 10, consumerId);
foreach (var message in pending)
{
if (message.IdleTimeInMilliseconds >= idleTimeInMs)
{
await _db.StreamClaimAsync(_streamKey, _consumerGroup, consumerId,
minIdleTimeInMs: idleTimeInMs, messageIds: new[] { message.MessageId });
}
}
}
Resource Management
The implementation includes robust resource management through both synchronous and asynchronous disposal patterns:
public async ValueTask DisposeAsync()
{
await DisposeAsyncCore();
GC.SuppressFinalize(this);
}
protected virtual async ValueTask DisposeAsyncCore()
{
ReleaseUnmanagedResources();
await _redis.DisposeAsync();
}
Performance Testing
Real-world performance testing demonstrates impressive throughput. The included benchmark program tests with 16 publishers and 8 subscribers processing a million messages:
var messageCount = 1000000;
int publisherCount = 16;
var publishers = Enumerable.Range(1, publisherCount).Select(
x =>
{
return Task.Run(async () =>
{
var currentId = 0L;
while ((currentId = Interlocked.Increment(ref idx)) <= messageCount)
{
await queue.PublishMessage(new Dictionary<string, string>
{
{ "type", "order" },
{ "id", $"{currentId:D9}" },
{ "amount", "100.00" }
});
}
});
}).ToArray();
The subscriber implementation demonstrates concurrent message processing:
int subscriberCount = 8;
var subscribers = Enumerable.Range(1, subscriberCount).Select(
x =>
queue.StartSubscriber($"consumer{x}", (message) =>
{
var id = int.Parse(message["id"]);
var stopped = id >= messageCount;
return Task.FromResult(stopped);
})).ToArray();
This implementation achieves several thousand messages per second in throughput, showcasing Redis Streams' capability to handle high-load scenarios efficiently. The combination of batch operations, prefetching, and concurrent processing enables this performance while maintaining strong delivery guarantees.
Key Advantages
Key advantages of this implementation over traditional Redis Pub/Sub include:
- Guaranteed message delivery with exactly-once processing
- Automatic work distribution across multiple consumers
- Message persistence and failure recovery
- Efficient batch processing and acknowledgment
- Clean resource management through modern disposal patterns
This pattern is particularly valuable for scenarios requiring reliable work distribution, such as:
- Order processing systems
- Task distribution in microservices architectures
- Event processing pipelines
- Background job queues
The implementation demonstrates how Redis Streams can serve as a foundation for building robust, high-performance message queues without additional infrastructure beyond your existing Redis deployment.
Have a goat day 🐐
Join the conversation.