Skip to content
Snippets Groups Projects
Commit deef1970 authored by David Garcia Quintas's avatar David Garcia Quintas
Browse files

async greeter server docs

parent fd2d36bf
No related branches found
No related tags found
No related merge requests found
...@@ -43,6 +43,7 @@ ...@@ -43,6 +43,7 @@
#include <grpc++/server_builder.h> #include <grpc++/server_builder.h>
#include <grpc++/server_context.h> #include <grpc++/server_context.h>
#include <grpc++/server_credentials.h> #include <grpc++/server_credentials.h>
#include "helloworld.grpc.pb.h" #include "helloworld.grpc.pb.h"
using grpc::Server; using grpc::Server;
...@@ -59,6 +60,7 @@ class ServerImpl final { ...@@ -59,6 +60,7 @@ class ServerImpl final {
public: public:
~ServerImpl() { ~ServerImpl() {
server_->Shutdown(); server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown(); cq_->Shutdown();
} }
...@@ -67,56 +69,101 @@ class ServerImpl final { ...@@ -67,56 +69,101 @@ class ServerImpl final {
std::string server_address("0.0.0.0:50051"); std::string server_address("0.0.0.0:50051");
ServerBuilder builder; ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterAsyncService(&service_); builder.RegisterAsyncService(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
cq_ = builder.AddCompletionQueue(); cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart(); server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl; std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
HandleRpcs(); HandleRpcs();
} }
private: private:
// Class encompasing the state and logic needed to serve a request.
class CallData { class CallData {
public: public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq) CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) { : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed(); Proceed();
} }
void Proceed() { void Proceed() {
if (status_ == CREATE) { if (status_ == CREATE) {
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
this); this);
// Make this instance progress to the PROCESS state.
status_ = PROCESS; status_ = PROCESS;
} else if (status_ == PROCESS) { } else if (status_ == PROCESS) {
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new CallData(service_, cq_); new CallData(service_, cq_);
// The actual processing.
std::string prefix("Hello "); std::string prefix("Hello ");
reply_.set_message(prefix + request_.name()); reply_.set_message(prefix + request_.name());
// And we are done! Let the gRPC runtime now we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
responder_.Finish(reply_, Status::OK, this); responder_.Finish(reply_, Status::OK, this);
status_ = FINISH; status_ = FINISH;
} else { } else {
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (CallData).
delete this; delete this;
} }
} }
private: private:
// The means of communication with the gRPC runtime for an asynchronous
// server.
Greeter::AsyncService* service_; Greeter::AsyncService* service_;
// The producer-consumer queue where for asynchronous server notifications.
ServerCompletionQueue* cq_; ServerCompletionQueue* cq_;
// Context for the server, allowing to tweak aspects of it such as the use
// of compression, authentication, etc.
ServerContext ctx_; ServerContext ctx_;
// What we get from the client.
HelloRequest request_; HelloRequest request_;
// What we send back to the client.
HelloReply reply_; HelloReply reply_;
// The means to get back to the client.
ServerAsyncResponseWriter<HelloReply> responder_; ServerAsyncResponseWriter<HelloReply> responder_;
// Let's implement a tiny state machine with the following states.
enum CallStatus { CREATE, PROCESS, FINISH }; enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_; CallStatus status_; // The current serving state.
}; };
// This can be run in multiple threads if needed. // This can be run in multiple threads if needed.
void HandleRpcs() { void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get()); new CallData(&service_, cq_.get());
void* tag; void* tag; // uniquely identifies a request.
bool ok; bool ok;
while (true) { while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
cq_->Next(&tag, &ok); cq_->Next(&tag, &ok);
GPR_ASSERT(ok); GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed(); static_cast<CallData*>(tag)->Proceed();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment