Skip to main content
Back to News
Lessons learnt from writing asynchronous streaming gRPC services in C++

Lessons learnt from writing asynchronous streaming gRPC services in C++

3 December 2019
  • Software Engineering

Introduction

We were building a service that receives market data and caches it for multiple clients to access. Our service is written in C++ while the clients are written in Java, so we needed a language-independent protocol for communication. We have chosen gRPC because it’s a mature, high-performance framework that has been widely used.
While being a well-performing library, the C++ interface is not intuitive and the documentation doesn’t cover all the common scenarios. In particular we couldn’t find an example of an asynchronous server which returns a stream of data instead of just one response. This is a problem others have already reported in the gRPC issue tracker.

After some googling and experimenting (and pulling out a lot of hair due to frustration) we got it working so now we are sharing the example hoping it would save someone else’s time (and hair).

Creating a server from gRPC example proto file

We assume that you know how to build the examples and are familiar with them. If not, please refer to gRPC’s documentation.
We also assume that you understand the basic asynchronous API (you can read about it here).

Let’s start with the existing asynchronous server example, which implements the service defined in this contract. It returns a single response, so the relevant code is basically these 3 lines:

reply_.set_message(prefix + request_.name());
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);

Now let’s have a look at the streaming version: streaming version.
The main difference here is that the service returns a stream of HelloReply:

rpc sayHello (HelloRequest) returns (stream HelloReply) {}

We need to generate the gRPC client and server interfaces. The provided Makefile doesn’t include hellostreamingworld.proto so you’ll need to do it yourself. You can have a look at our modified Makefile to see how that can be done.

Running that Makefile will create 4 files:

hellostreamingworld.grpc.pb.cc
hellostreamingworld.grpc.pb.h
hellostreamingworld.pb.cc
hellostreamingworld.pb.h

The last step is to change the server’s code. For a streaming response the way we send data back to the client is to call Write() multiple times, each time for one response, which can be seen in the route_guide synchronous example.

A naïve attempt to copy that pattern into the asynchronous example will not work:

responder_.Write(reply1, this);
responder_.Write(reply2, this);
responder_.Write(reply3, this);
responder_.Finish(Status::OK, this);

If you do that the program will simply crash. The reason is the asynchronous API allows only “1 outstanding asynchronous write on the same side of the same stream without waiting for the completion queue notification“. What that means is that for each task taken from the completion queue, you can only call Write() on it once. Calling Write() also adds a new task to the front (this is a very unpleasant surprise we found while debugging, we will go into details later on) of the completion queue, so if you need another write you have to let the current call finish, wait for your task to be taken from the completion queue and then call Write() on that new task.

When you are done with writing you let the client know by calling Finish(). Alternatively when you need to call Write() for the last time you can call WriteAndFinish() instead and not have another task just to call Finish():

else if (status_ == PROCESS)
{
    // each CallData object should create only one new CallData
    if (times_ == 0)
    {
        new CallData(service_, cq_);
    }

    if (times_++ >= 3) // we want to send the response 3 times (for whatever reason)
    {
        status_ = FINISH;
        responder_.Finish(Status::OK, this);
    }
    else
    {
        std::string prefix("Hello ");
        reply_.set_message(prefix + request_.name() + ", no " + request_.num_greetings());
        responder_.Write(reply_, this);
    }
}

The modified server’s code can be found here. For testing you can use a simple (synchronous) client. We have also uploaded the Makefile that we used to build the modified examples. If you copy that file into <grpc_directory>/examples/cpp/helloworld together with the examples you should be able to build them.

All the modified examples together with the generated gRPC files can be found here.

How to add a task to the completion queue

Imagine a scenario that when we pick a task from the completion queue we don’t have any data to send:

if (HasData()) 
{
    responder_.Write(reply_, this);
}
else
{
    // do nothing
}

This code doesn’t work because in the case of not having data we don’t call Write() (surprise!), which also means we don’t put a new task into the completion queue, so we never get the opportunity to call Write()again.

The solution is to manually put a new task into the completion queue if we don’t call Write(). Surprisingly there’s no straightforward way to do this seemingly trivial operation. The way we tackled this problem was using a grpc::Alarm object. The alarm object will add a new task to the back of the completion queue when it expires or when it’s cancelled. The difference is that if it expires the event will have a success bit set.

if (HasData()) 
{
    responder_.Write(reply_, this);
}
else
{
    grpc::Alarm alarm;
    alarm_.Set(completion_queue_, std::chrono::system_clock::now(), this);
}

A bonus point: if you care about latency and don’t want to trigger system calls every time you put a task into the completion queue, you can avoid calling system_clock::now() by using this instead:

if (HasData())
{
    responder_.Write(reply_, this);
}
else
{
    grpc::Alarm alarm;
    alarm.Set(completion_queue_, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), this);
}

An example showing how to add tasks to the completion queue can be found here.

How to make Write() put the task into the back of the completion queue

In our application, we have lots of clients connected to one gRPC server. A client connects to the server and says “Hey, I’m interested in thing x.” The server subscribes to some external source of information about thing x, and sends it on to the client. The client keeps its connection open indefinitely and the server sends information to it until the client disconnects. Simple, as long as we only have one client. Things are a little more complicated when we have multiple clients connected, because we need to have a strategy for scheduling writes. For our application, we just need to do a round-robin of clients: client 1 gets its batch of data, client 2 gets its batch and so on, looping back to client 1 when everyone has been served.

You might expect (I certainly did) that the completion queue would take care of the scheduling for us: we call Write(), and gRPC puts that task on the back of the completion queue. Job done, right? Let’s try it. First, we modify async_streaming_server.cc a little, to make it more obvious which client is being served. The modified version is available here. Essentially, we just have the server print a client identifier whenever it sends a message. It also waits a second before writing a message, just to hold the connection open longer and make it easier to observe.

If we run the server and then run two clients at the same time, you’ll see that output looks like this:

$ ./async_streaming_server_queue_to_front
Server listening on 0.0.0.0:50051
Created CallData 0
Client being processed: 0
Created CallData 1
Client being processed: 0
Client being processed: 0
Client being processed: 0
Client being processed: 1
Created CallData 2
Client being processed: 1
Client being processed: 1
Client being processed: 1

The server is actually sending everything to one client, and then the other! Tasks are put back on to the front of the completion queue. In our application, where we continually send a stream of data as fast as we can, this doesn’t work: the first client is the only client that gets served. As you might expect, this is hilarious to debug when you don’t know about it.

Our workaround for this is simple, if a little inelegant. We add an extra state called PUSH_TO_BACK to our task, and then use the Alarm trick to move the task to the back next time it gets called.

else if (status_ == PROCESS)
{
    // …
    responder_.Write(reply_, this);
    status_ = PUSH_TO_BACK;
}
else if(status_ == PUSH_TO_BACK)
{
    status_ = PROCESS;
    alarm_.Set(cq_, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), this);
}

Running this, we see that the clients are processed in a round robin, as I originally expected:

$ ./async_streaming_server_queue_to_back
Server listening on 0.0.0.0:50051
Created CallData 0
Client being processed: 0
Created CallData 1
Client being processed: 1
Created CallData 2
Client being processed: 0
Client being processed: 1
Client being processed: 0
Client being processed: 1
Client being processed: 0
Client being processed: 1

Conclusion

Our async gRPC server has been in production for over 6 months and is performing well. The result was worth the time and hard work making gRPC behave as we needed it to. We hope others will find this blog and our examples useful.

 

Hong Hai Chu & Stefans Mezulis – Software Engineering

Stay up to date with
G-Research