diff --git a/cpp/helloworld/greeter_async_server.cc b/cpp/helloworld/greeter_async_server.cc index 32c3113989178013374a34d48044d1952750ed89..47110a719f7b412503a9803dc2e2b18220e02f56 100644 --- a/cpp/helloworld/greeter_async_server.cc +++ b/cpp/helloworld/greeter_async_server.cc @@ -57,16 +57,13 @@ using helloworld::HelloRequest; using helloworld::HelloReply; using helloworld::Greeter; -static bool got_sigint = false; - class ServerImpl final { public: - ServerImpl() : service_(&service_cq_) {} + ServerImpl() : service_(&cq_) {} ~ServerImpl() { server_->Shutdown(); - rpc_cq_.Shutdown(); - service_cq_.Shutdown(); + cq_.Shutdown(); } // There is no shutdown handling in this code. @@ -79,46 +76,56 @@ class ServerImpl final { server_ = builder.BuildAndStart(); std::cout << "Server listening on " << server_address << std::endl; - while (true) { - CallData* rpc = new CallData(); - service_.RequestSayHello(&rpc->ctx, &rpc->request, &rpc->responder, - &rpc_cq_, rpc); - void* got_tag; - bool ok; - service_cq_.Next(&got_tag, &ok); - GPR_ASSERT(ok); - GPR_ASSERT(got_tag == rpc); - - std::thread t(&ServerImpl::HandleRpc, this, rpc); - t.detach(); - } + HandleRpcs(); } private: - struct CallData { - CallData() : responder(&ctx) {} - ServerContext ctx; - HelloRequest request; - HelloReply reply; - ServerAsyncResponseWriter<HelloReply> responder; + class CallData { + public: + CallData(Greeter::AsyncService* service, CompletionQueue* cq) + : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) { + Proceed(); + } + + void Proceed() { + if (status_ == CREATE) { + service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, this); + status_ = PROCESS; + } else if (status_ == PROCESS) { + new CallData(service_, cq_); + std::string prefix("Hello "); + reply_.set_message(prefix + request_.name()); + responder_.Finish(reply_, Status::OK, this); + status_ = FINISH; + } else { + delete this; + } + } + + private: + Greeter::AsyncService* service_; + CompletionQueue* cq_; + ServerContext ctx_; + HelloRequest request_; + HelloReply reply_; + ServerAsyncResponseWriter<HelloReply> responder_; + enum CallStatus { CREATE, PROCESS, FINISH }; + CallStatus status_; }; - // Runs in a detached thread, processes rpc then deletes data. - void HandleRpc(CallData* rpc) { - std::string prefix("Hello "); - rpc->reply.set_message(prefix + rpc->request.name()); - rpc->responder.Finish(rpc->reply, Status::OK, &rpc->ctx); - void* got_tag; + // This can be run in multiple threads if needed. + void HandleRpcs() { + new CallData(&service_, &cq_); + void* tag; bool ok; - rpc_cq_.Next(&got_tag, &ok); - GPR_ASSERT(ok); - GPR_ASSERT(got_tag == &rpc->ctx); - - delete rpc; + while (true) { + cq_.Next(&tag, &ok); + GPR_ASSERT(ok); + static_cast<CallData*>(tag)->Proceed(); + } } - CompletionQueue service_cq_; - CompletionQueue rpc_cq_; + CompletionQueue cq_; Greeter::AsyncService service_; std::unique_ptr<Server> server_; };