Skip to content
Snippets Groups Projects
Commit 4191ea7d authored by Vijay Pai's avatar Vijay Pai
Browse files

Merge pull request #2028 from ctiller/more-is-better-than-less

Make async server use one CQ per server thread
parents 77ece807 51f938f1
No related branches found
No related tags found
No related merge requests found
...@@ -73,31 +73,35 @@ class AsyncQpsServerTest : public Server { ...@@ -73,31 +73,35 @@ class AsyncQpsServerTest : public Server {
gpr_free(server_address); gpr_free(server_address);
builder.RegisterAsyncService(&async_service_); builder.RegisterAsyncService(&async_service_);
srv_cq_ = builder.AddCompletionQueue(); for (int i = 0; i < config.threads(); i++) {
srv_cqs_.emplace_back(std::move(builder.AddCompletionQueue()));
}
server_ = builder.BuildAndStart(); server_ = builder.BuildAndStart();
using namespace std::placeholders; using namespace std::placeholders;
request_unary_ = for (int i = 0; i < 10; i++) {
std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_, for (int j = 0; j < config.threads(); j++) {
_1, _2, _3, srv_cq_.get(), srv_cq_.get(), _4); auto request_unary = std::bind(
request_streaming_ = &TestService::AsyncService::RequestUnaryCall, &async_service_, _1,
std::bind(&TestService::AsyncService::RequestStreamingCall, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
&async_service_, _1, _2, srv_cq_.get(), srv_cq_.get(), _3); auto request_streaming = std::bind(
for (int i = 0; i < 100; i++) { &TestService::AsyncService::RequestStreamingCall, &async_service_,
contexts_.push_front( _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( contexts_.push_front(
request_unary_, ProcessRPC)); new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
contexts_.push_front( request_unary, ProcessRPC));
new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( contexts_.push_front(
request_streaming_, ProcessRPC)); new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
request_streaming, ProcessRPC));
}
} }
for (int i = 0; i < config.threads(); i++) { for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() { threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down // Wait until work is available or we are shutting down
bool ok; bool ok;
void *got_tag; void *got_tag;
while (srv_cq_->Next(&got_tag, &ok)) { while (srv_cqs_[i]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag); ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke // The tag is a pointer to an RPC context to invoke
bool still_going = ctx->RunNextState(ok); bool still_going = ctx->RunNextState(ok);
...@@ -125,11 +129,13 @@ class AsyncQpsServerTest : public Server { ...@@ -125,11 +129,13 @@ class AsyncQpsServerTest : public Server {
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join(); thr->join();
} }
srv_cq_->Shutdown(); for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
bool ok; (*cq)->Shutdown();
void *got_tag; bool ok;
while (srv_cq_->Next(&got_tag, &ok)) void *got_tag;
; while ((*cq)->Next(&got_tag, &ok))
;
}
while (!contexts_.empty()) { while (!contexts_.empty()) {
delete contexts_.front(); delete contexts_.front();
contexts_.pop_front(); contexts_.pop_front();
...@@ -306,15 +312,8 @@ class AsyncQpsServerTest : public Server { ...@@ -306,15 +312,8 @@ class AsyncQpsServerTest : public Server {
} }
std::vector<std::thread> threads_; std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_; std::unique_ptr<grpc::Server> server_;
std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_; std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
TestService::AsyncService async_service_; TestService::AsyncService async_service_;
std::function<void(ServerContext *, SimpleRequest *,
grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
request_unary_;
std::function<void(
ServerContext *,
grpc::ServerAsyncReaderWriter<SimpleResponse, SimpleRequest> *, void *)>
request_streaming_;
std::forward_list<ServerRpcContext *> contexts_; std::forward_list<ServerRpcContext *> contexts_;
std::mutex shutdown_mutex_; std::mutex shutdown_mutex_;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment