概述
因为gRPC 的异步调用代码写的比较绕,所以这篇文章主要用来记录一下 gRPC 的异步调用。
需要注意的是,gRPC 为了实现异步调用,使用的是 CompletionQueue 绑定进行 RPC 调用,实际写代码的时候会感觉到比较奇怪。相应的因为是异步的,所以会调用CompletionQueue::Next
来等待回包操作。这里先留个印象,下面讲流程的时候会比较清晰。
编译安装
为了完成我们的demo,没有安装gRPC的同学可以直接使用下面的命令进行安装:
$ export MY_INSTALL_DIR=$HOME/.local
$ mkdir -p $MY_INSTALL_DIR
$ export PATH="$MY_INSTALL_DIR/bin:$PATH"
# 工具安装
$ sudo apt install -y cmake #安装好cmke的同学省略
$ sudo apt install -y build-essential autoconf libtool pkg-config
# 下载项目
$ git clone --recurse-submodules -b v1.45.0 --depth 1 --shallow-submodules https://github.com/grpc/grpc
# 编译安装
$ cd grpc
$ mkdir -p cmake/build
$ pushd cmake/build
$ cmake -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF -DCMAKE_INSTALL_PREFIX=$INSTALL_DIR ../..
$ make -j
$ make install
$ popd
下面介绍的例子都使用官方的:https://github.com/grpc/grpc/blob/master/examples/cpp/helloworld/。
异步 Client
对于同步的 client 来说,由于调用远程方法时会阻塞当前线程,但是异步允许同时发送多个请求,并且不会阻塞。
那么我们在使用 gRPC的异步API的时候 需要做到以下几点才行:
- 由于要进行异步非阻塞的请求,那么在发送请求的时候肯定不能等待回包;
- 所有的回包都通过另一个线程异步处理,避免主流程阻塞;
- 回包的数据通过某种介质进行传递,gRPC 使用的是CompletionQueue 来进行传递。
整体流程从上面图可以看得出分为以下几个:
- 启动client,启动一个旁路线程循环获取 CompletionQueue 数据;
- 发送异步调用给 server;
- 如果 server 回包了,会将数据放入到 CompletionQueue 里面;
- 异步线程获取 CompletionQueue 数据并返回;
我们下面来看一下例子。
创建
首先是创建客户端以,然后异步线程处理 server 的回包,因为处理回包的时候会阻塞。
// 创建客户端
GreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// 起新的线程,从队列中取出结果并处理
std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);
for (int i = 0; i < 100; i++) {
std::string user("world " + std::to_string(i));
// 发送rpc请求
greeter.SayHello(user); // The actual RPC call!
}
std::cout << "Press control-c to quit" << std::endl << std::endl;
// 阻塞
thread_.join(); // blocks forever
异步请求
这里会通过调用 SayHello 来发送请求:
class GreeterClient {
public:
explicit GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
...
void SayHello(const std::string& user) {
HelloRequest request;
request.set_name(user);
// 用来存储 rpc 数据
AsyncClientCall* call = new AsyncClientCall;
// 这里是调用 Async 方法创建 RPC 对象,但是不会理吗开始进行 RPC请求
call->response_reader =
stub_->PrepareAsyncSayHello(&call->context, request, &cq_);
// 初始化RPC调用
call->response_reader->StartCall();
// 进行RPC请求,然后 call 对象作为一个 tag 放进去,会包数据会放到 reply中
// 这里不会阻塞等待请求
call->response_reader->Finish(&call->reply, &call->status, (void*) call);
}
private:
...
CompletionQueue cq_;
};
因为回包逻辑不在这里,所以在调用完 Finish 之后不需要等待可以直接返回。在调用完这个方法之后就可以等待 server 回包了,server 会将回包数据塞入到 cq_ 里面。
异步处理回包
异步处理回包的逻辑就在我们一开始创建的 thread 里面会循环调用。
void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;
// 从队列里面取出回调信息,没有如果还没回包的话会阻塞
while (cq_.Next(&got_tag, &ok)) {
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
GPR_ASSERT(ok);
if (call->status.ok())
// 获取到回包的数据
std::cout << "Greeter received: " << call->reply.message() << std::endl;
else
std::cout << "RPC failed" << std::endl;
// 销毁new的对象
delete call;
}
}
这里会一直循环调用 Next 方法处理 server 的 response,如果没有回包会一直阻塞,所以这里需要新起一个线程处理,避免阻塞主流程。
异步 Server
一般我们在写 Server 端的时候如果是同步操作的话,在收到请求之后立即处理,然后给 client 回包,这个回包的过程中需要等待回包完毕整个 RPC 请求才算结束,这里就存在阻塞等待的过程,而使用异步请求就是用来避免阻塞,可以在单个线程里面处理更多的信息量。
但是在理解 gRPC 异步 API 的时候还是会感觉非常的别扭,导致一开始看不太明白实例代码的含义。
首先,gRPC让你像流水线操作一样,要先准备一个 CallData 对象作为一个容器,然后 gRPC 会通过 ServerCompletionQueue 将各种事件发送到 CallData 对象中,让这个对象根据自身的状态进行处理。
然后处理完毕当前的事件之后还需要手动再创建一个 CallData 对象,这个对象是为下个 Client 请求准备的,整个过程就像流水线一样。
上面这个异步的过程还有一个小状态机在里面,全部都由 CallData 对象进行扭转:
-
CallData 对象刚创建的时候会从 CREATE 状态扭转为 PROCESS 状态,表示等待接收请求;
-
请求过来之后,首先会创建一个 CallData 对象,然后处理完后扭转为 FINISH 状态,等待给 Client 回包结束;
-
回包结束之后将 CallData 对象自身删除。
清楚了这个CallData 对象是用来做什么之后,下面我们来看一下整个 Server 的流程应该如下:
- Server 启动,注册,创建一个 CallData 对象,这个对象用来给下个 Client 请求准备的;
- 创建好的 CallData 对象会交给 gRPC 托管,有事件过来的时候会将事件放入到 CallData 对象对,然后以 ServerCompletionQueue 对象来进行通知;
- 等待 Client 请求过来…
- 有事件过来之后会从 ServerCompletionQueue 对象中解包出来数据,转为 CallData 对象调用 Proceed 方法,然后进行业务逻辑处理,并重新创建 CallData 对象,用来给下个 Client 请求准备的;
- 等待给 Client 的回包结束。。。
- 继续处理 ServerCompletionQueue 回过来的 event 事件,清理自身 CallData 对象。
由于这个图我也不知道怎么画了,算了摆烂,不画了,自己脑补ba~
下面看看代码。
启动注册
void Run() {
// 启动注册监听
std::string server_address("0.0.0.0:50051");
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// 处理RPC
HandleRpcs();
}
启动主流程
std::unique_ptr<ServerCompletionQueue> cq_;
void HandleRpcs() {
// 这里其实是异步逻辑的处理地方
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Next方法会阻塞,直到有下个请求过来,才会继续往下走
GPR_ASSERT(cq_->Next(&tag, &ok));
// 必须检查 Next 的返回值,这个返回值告诉我们是有事件到来
// 还是 cq_ 正在关闭。
GPR_ASSERT(ok);
// 转成 CallData 调用 Proceed
static_cast<CallData*>(tag)->Proceed();
}
}
主流程这里会创建 CallData 对象,然后不断循环从 cq 对象里面获取事件,这个 cq 就是一个等待队列,没有事件过来的时候会一直阻塞。有事件过来之后会从 cq_ 里面取出 tag 转换成 CallData对象调用 Proceed 方法。
创建 CallData & 逻辑处理 & 完成
class ServerImpl final {
...
private:
class CallData {
public:
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// 首次进入,改变状态到 PROCESS
status_ = PROCESS;
// 需要注意的是,这里将 this 作为 tag 塞入到请求中作为唯一识别请求
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
this);
} else if (status_ == PROCESS) {
// 在我们处理这个请求之前,创建一个新的 CallData 实例用于处理未来
// 的新请求。实例会在它的 FINISH 状态流程中释放自己占用的内存
new CallData(service_, cq_);
// 实际的业务逻辑了
std::string prefix("Hello ");
// 从request_中获取client请求数据,并设置回包数据
reply_.set_message(prefix + request_.name());
// 业务处理完毕,让 gRPC 运行时知道我们已经完成了,使用这个实例的内存
// 地址作为事件内唯一识别请求的 tag
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
} else {
GPR_ASSERT(status_ == FINISH);
// 已经到达 FINISH 状态,释放自身占用内存(CallData)
delete this;
}
}
private:
Greeter::AsyncService* service_;
// 生产-消费队列,用来异步服务消息通知
ServerCompletionQueue* cq_;
// RPC 的上下文信息,用于例如压缩、鉴权以及发送元数据给客户等用途。
ServerContext ctx_;
// 从客户端接受到了什么
HelloRequest request_;
// 从客户端返回什么
HelloReply reply_;
// 用于回复客户端的方法
ServerAsyncResponseWriter<HelloReply> responder_;
// 状态机
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_; // The current serving state.
};
...
};
从这里我们就可以和上面的 HandleRpcs 方法联系起来。
- 首先是 new CallData 会直接调用 Proceed 方法,这个时候会进入到 if 的第一个分支,然后将自身 this 写入到 cq_ 中,并将状态扭转为 PROCESS;
- 这个时候会继续会到 HandleRpcs 方法的 while 循环中等待事件;
- 当有 Client 发送请求过来时会进入到 Proceed 的 if 第二个分支进行业务逻辑的处理;
- 这里首先会 new CallData 给下一个请求使用;
- 然后从 request_ 获取到 Client 请求参数并进行处理;
- 回包数据写入到 reply_ 中,最后调用 Finish 结束;
- 这个时候会继续会到 HandleRpcs 方法的 while 循环中等待给 Client 的 response 回包结束;
- 收到回包结束之后会继续调用到 Proceed 方法的 if 第三个分支,删除当前对象。
总结
其实对比 go 的 grpc 的异步 API 来看,不得不说 cpp 的 API 设计很有问题,咋一看之下,根本不知道 new CallData 有什么用,为啥 new 了之后不做任何事情,也没有 delete 操作,不会内存溢出吗?然后进入到构造方法中才发现逻辑都在构造函数中,这样的写代码的方式我只在这里看到过。
也有很多人吐槽它这个 api 的设计:https://github.com/grpc/grpc/issues/7352 ,官方也承诺了会修改,但是一晃 6年过去了也没有改进的意思。
Reference
https://grpc.io/docs/languages/cpp/quickstart/
https://grpc.io/docs/languages/cpp/async/
https://blog.miigon.net/posts/cn-so-difference-between-sync-and-async-grpc/