Bi-Directional gRPC Stream Cache Server in C#

Bonjour! In the ever-evolving landscape of modern software development, having tools that optimize performance, scalability, and real-time processing is paramount. And among such tools, gRPC in C# stands out. Today, we delve deeper into the nuances of gRPC, its applications in distributed caching, and how it can revolutionize real-time monitoring of critical hardware equipment.

Why gRPC ?

Google's gRPC is not merely another remote procedure call (RPC) system. It's an open-source framework that provides high-performance, language-agnostic communication between applications. Its emphasis on small binary size and low latency makes it especially suited for microservice architectures and other distributed systems.

A Simple Distributed Cache Implementation with gRPC

Distributed caching optimizes application performance by allowing data to be stored across multiple machines. With gRPC in C#, this becomes even more streamlined:

  1. Server Creation: First, a server instance is established that listens for incoming cache requests.
  2. Client Requests: Client applications can then send cache requests to this server, either to fetch or store data (typically GetKey and SetKey methods).
  3. Response and Storage: The gRPC server, upon receiving the request, either returns the cached data or stores the new data, as necessary.

So let's start with implementation.First, you need to create a solution from project models (use Service gRPC ASP.NET Core model).

Right now, we have a basic gRPC server, with service Greater that returns "Hello {name}".

We define a proto file for our cacher service:

syntax = "proto3";

option csharp_namespace = "SimplegRPCCacheService";

package simplecache;

service Cacher {
	rpc GetKey (GetKeyRequest) returns (GetKeyResponse) {}
	rpc SetKey (SetKeyRequest) returns (SetKeyResponse) {}
}

message GetKeyRequest {
	string key = 1;
}

message GetKeyResponse {
	bytes key_value = 1;
	optional bool not_found = 2;
}

message SetKeyRequest {
	string key = 1;
	bytes key_value = 2;
	optional bool remove_key = 3;
}

message SetKeyResponse {
	int32 set_key_result = 1;
	optional string error_msg = 2;
}

And the corresponding implementation:

public class CacherService :Cacher.CacherBase
{
    private readonly ILogger<CacherService> _logger;
    private readonly IKeyValueStore _keyValueStore;
    public CacherService(ILogger<CacherService> logger,IKeyValueStore keyValueStore )
    {
        _keyValueStore = keyValueStore;
        _logger = logger;
    }

    public override Task<GetKeyResponse> GetKey(GetKeyRequest request, ServerCallContext context)
    {
        _logger.LogDebug($">>GetKey() key={request.Key}");
        var (isError, result) = _keyValueStore.GetKey(request.Key);
        if (isError)
        {
            _logger.LogDebug($"<<GetKey() key={request.Key} not found");
            return Task.FromResult(new GetKeyResponse()
            {
                NotFound = true,
                KeyValue = ByteString.Empty
            });
        }
        _logger.LogDebug($"<<GetKey() key={request.Key} found");
        return Task.FromResult(new GetKeyResponse()
        {
            KeyValue = result == null ? ByteString.Empty : ByteString.CopyFrom(result)
        });
    }

    public override Task<SetKeyResponse> SetKey(SetKeyRequest request, ServerCallContext context)
    {
        _logger.LogDebug($">>SetKey() key={request.Key},remove_key={request.RemoveKey}");
        var result = _keyValueStore.SetKey(request.Key, request.KeyValue.ToByteArray(), request.RemoveKey);
        _logger.LogDebug($"<<SetKey() key={request.Key},remove_key={request.RemoveKey},set_key_result={result}");
        return Task.FromResult(new SetKeyResponse()
        {
            SetKeyResult = result,
        });
    }
}

We use here as KeyValueStore a in memory KeyValueStore provided by .Net ConcurrentDictonary

To test it, we create to

"Simple, is'nt it ?" But what if we added a layer of real-time streaming ?

Enhancing the Cache with gRPC Streams

gRPC's bidirectional streaming is a game-changer. Instead of single requests and responses, you have an open channel between the client and server:

  1. Persistent Connection: Once a client initiates a stream, the connection remains active.

On client side:

    ...
    private Task StartServerStreamAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken)
    {
        return Task.Factory.StartNew(async () => await ExecuteAsync(serviceProvider, stoppingToken), stoppingToken,TaskCreationOptions.RunContinuationsAsynchronously,TaskScheduler.Default);
    }
    
    private async Task ExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken)
    {
        _logger.LogInformation("StartServerStreamAsync started");
        try
        {
            CacherClientProxy cacherClientProxy = serviceProvider.GetRequiredService<CacherClientProxy>();
            using var call = cacherClientProxy.ExchangeCommands(stoppingToken);
            while (await call.ResponseStream.MoveNext(stoppingToken))
            {
                var command = call.ResponseStream.Current;
                switch (command.CommandType)
                {
                    case CommandType.SetKey:
                        var setResult = SetKeyInLocalCache(command.Key, command.KeyValue.ToByteArray());
                        if (setResult)
                        {
                            await call.RequestStream.WriteAsync(
                                new CommandResponse { Result = CommandResult.Success }, stoppingToken);
                        }
                        else
                        {
                            await call.RequestStream.WriteAsync(
                                new CommandResponse
                                    { Result = CommandResult.Error, ErrorMessage = "Failed to set key" },
                                stoppingToken);
                        }
                        break;

                    case CommandType.GetKey:
                        var (notFound,getValue) = GetKeyFromLocalCache(command.Key);
                        if (notFound)
                        {
                            await call.RequestStream.WriteAsync(
                                new CommandResponse
                                    { Result = CommandResult.NotFound, KeyValue = ByteString.Empty },
                                stoppingToken);
                        }
                        else
                        {
                            await call.RequestStream.WriteAsync(
                                new CommandResponse
                                    { Result = CommandResult.Success, KeyValue = ByteString.CopyFrom(getValue) },
                                stoppingToken);
                        }
                        break;

                    case CommandType.ForceFlushCache:
                        var flushResult = ForceFlushCache();
                        if (flushResult)
                        {
                            await call.RequestStream.WriteAsync(
                                new CommandResponse { Result = CommandResult.Success }, stoppingToken);
                        }
                        else
                        {
                            await call.RequestStream.WriteAsync(
                                new CommandResponse
                                    { Result = CommandResult.Error, ErrorMessage = "Failed to flush cache" },
                                stoppingToken);
                        }
                        break;

                    case CommandType.Idle:
                        var (clientId,clientStats) = Idle();
                        await call.RequestStream.WriteAsync(
                            new CommandResponse
                                { Result = CommandResult.Success, ClientId = clientId, ClientStats = clientStats },
                            stoppingToken);
                        break;

                    case CommandType.GraceFullShutdown:
                        GraceFullShutdown();
                        await call.RequestStream.WriteAsync(new CommandResponse { Result = CommandResult.Success },
                            stoppingToken);
                        await call.RequestStream.CompleteAsync();
                        return;
                }
            }
        }
        catch (Exception e)
        {
            _logger.LogError(e, "Error in StartServerStreamAsync()");
        }
    }
    ...

On server side:

    ...
    public override async Task ExchangeCommands(IAsyncStreamReader<CommandResponse> responseStream, IServerStreamWriter<CommandRequest> requestStream, ServerCallContext context)
    {
        using CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
        using SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1, 1);
        
        var commandResponse = await ExecuteCommand(
            semaphoreSlim,
            new CommandRequest()
            {
                ServerTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
                CommandType = CommandType.Idle
            },
            responseStream,
            requestStream,
            context.CancellationToken
        );

        if (commandResponse == null)
        {
            throw new NullReferenceException("commandResponse must have a value");
        }

        var clientInfo = RegisterClient(
            commandResponse.ClientId ?? throw new NullReferenceException("ClientId must have a value"),
            semaphoreSlim, cancellationTokenSource);

        try
        {
            while (!cancellationTokenSource.IsCancellationRequested &&
                   !context.CancellationToken.IsCancellationRequested)
            {
                WaitHandle.WaitAny(new WaitHandle[]
                {
                    cancellationTokenSource.Token.WaitHandle,
                    context.CancellationToken.WaitHandle,
                    clientInfo.Commands.WaitHandle
                });

                if (cancellationTokenSource.IsCancellationRequested ||
                    context.CancellationToken.IsCancellationRequested)
                {
                    break;
                }

                if (clientInfo.Commands.TryDequeue(out CommandQueueItem commandQueueItem))
                {
                    try
                    {
                        commandResponse = await ExecuteCommand(semaphoreSlim,
                            commandQueueItem.Request,
                            responseStream,
                            requestStream,
                            context.CancellationToken
                        );
                        if (commandResponse == null)
                        {
                            throw new NullReferenceException("commandResponse must have a value");
                        }

                        commandQueueItem.Response = commandResponse;
                    }
                    catch (Exception e)
                    {
                        commandQueueItem.Exception = e;
                        _logger.LogError($"ExchangeCommands() client={clientInfo.Key},error={e.Message}");
                    }
                    finally
                    {
                        commandQueueItem.ExecutionCompleted.Set();
                    }

                    if (commandQueueItem.Exception != null)
                    {
                        throw commandQueueItem.Exception;
                    }
                }
            }
        }
        finally
        {
            UnRegisterClient(clientInfo.Key);
        }
    }
    ...
  1. The server iniate the communication: Once the client is connected on the server, the server send the first command to the client.The client send back its identifier, than the server completes the initialization exchange by registering with its identifier.

  2. The server is ready to execute or broadcast command to the connected client(s): The clients are automatically de-registered when they log out or are revoked by the server.

    ...
    public async Task<IEnumerable<CommandResponse>> BroadCastCommandAsync(CommandRequest commandRequest, CancellationToken cancellationToken = default)
    {
        List<Task<CommandResponse>> tasks = new List<Task<CommandResponse>>();

        lock (_lock)
        {
            foreach (var clientEntry in _clients)
            {
                var clientInfo = clientEntry.Value;
                var commandQueueItem = new CommandQueueItem(commandRequest, new ManualResetEventSlim(false));

                clientInfo.Commands.Enqueue(commandQueueItem);

                tasks.Add(WaitForCommandCompletionAsync(commandQueueItem, cancellationToken));
            }
        }

        CommandResponse[] responses;
        try
        {
            responses = await Task.WhenAll(tasks);
        }
        catch (Exception ex)
        {
            throw new AggregateException("One or more clients failed to process the command.", ex);
        }

        return responses;
    }

    private async Task<CommandResponse> WaitForCommandCompletionAsync(CommandQueueItem commandQueueItem, CancellationToken cancellationToken)
    {
        await Task.Factory.StartNew(() => commandQueueItem.ExecutionCompleted.Wait(cancellationToken),
            cancellationToken,
            TaskCreationOptions.RunContinuationsAsynchronously,
            TaskScheduler.Default
        );

        if (commandQueueItem.IsError)
        {
            throw commandQueueItem.Exception!;
        }

        return commandQueueItem.Response!;
    }
    ...

The full implementation is available on GitHub https://github.com/goatreview/SimplegRPCCache

Concrete Case: Real-time Monitoring of Critical Hardware Equipment with gRPC Streams

Envision a massive industrial setup, brimming with an array of critical hardware equipment: turbines, generators, and heavy machinery, each requiring constant monitoring. Traditional systems might entail manual checks or delayed notifications, which, in case of any malfunction, could lead to costly downtimes or safety hazards.

With a distributed cache system enhanced by gRPC streams, as soon as any equipment's state changes—be it temperature spikes, pressure drops, or speed irregularities—a real-time update is dispatched to the monitoring systems. No longer are there lapses or manual checks. Engineers receive instantaneous notifications, allowing them to react swiftly, rectify potential issues, and ensure the continuous, safe operation of all machinery.

This isn't merely about efficiency; it's about ensuring safety, reducing downtime, and optimizing performance in some of the most critical environments on the planet.

Summary

  • gRPC in C# offers high-performance communication for distributed systems.
  • Distributed caching with gRPC optimizes data access and storage.
  • gRPC streams elevate real-time data processing, especially beneficial for critical real-time applications.
  • Real-time hardware monitoring using gRPC streams ensures safety and optimal performance in industrial settings.

Have a goat day 🐐