Skip to content
Snippets Groups Projects
Commit 51f938f1 authored by Craig Tiller's avatar Craig Tiller
Browse files

Make async server use one CQ per server thread

parent 77ece807
No related branches found
No related tags found
No related merge requests found
...@@ -73,31 +73,35 @@ class AsyncQpsServerTest : public Server { ...@@ -73,31 +73,35 @@ class AsyncQpsServerTest : public Server {
gpr_free(server_address); gpr_free(server_address);
builder.RegisterAsyncService(&async_service_); builder.RegisterAsyncService(&async_service_);
srv_cq_ = builder.AddCompletionQueue(); for (int i = 0; i < config.threads(); i++) {
srv_cqs_.emplace_back(std::move(builder.AddCompletionQueue()));
}
server_ = builder.BuildAndStart(); server_ = builder.BuildAndStart();
using namespace std::placeholders; using namespace std::placeholders;
request_unary_ = for (int i = 0; i < 10; i++) {
std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_, for (int j = 0; j < config.threads(); j++) {
_1, _2, _3, srv_cq_.get(), srv_cq_.get(), _4); auto request_unary = std::bind(
request_streaming_ = &TestService::AsyncService::RequestUnaryCall, &async_service_, _1,
std::bind(&TestService::AsyncService::RequestStreamingCall, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
&async_service_, _1, _2, srv_cq_.get(), srv_cq_.get(), _3); auto request_streaming = std::bind(
for (int i = 0; i < 100; i++) { &TestService::AsyncService::RequestStreamingCall, &async_service_,
contexts_.push_front( _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( contexts_.push_front(
request_unary_, ProcessRPC)); new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
contexts_.push_front( request_unary, ProcessRPC));
new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( contexts_.push_front(
request_streaming_, ProcessRPC)); new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
request_streaming, ProcessRPC));
}
} }
for (int i = 0; i < config.threads(); i++) { for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() { threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down // Wait until work is available or we are shutting down
bool ok; bool ok;
void *got_tag; void *got_tag;
while (srv_cq_->Next(&got_tag, &ok)) { while (srv_cqs_[i]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag); ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke // The tag is a pointer to an RPC context to invoke
bool still_going = ctx->RunNextState(ok); bool still_going = ctx->RunNextState(ok);
...@@ -125,11 +129,13 @@ class AsyncQpsServerTest : public Server { ...@@ -125,11 +129,13 @@ class AsyncQpsServerTest : public Server {
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join(); thr->join();
} }
srv_cq_->Shutdown(); for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
bool ok; (*cq)->Shutdown();
void *got_tag; bool ok;
while (srv_cq_->Next(&got_tag, &ok)) void *got_tag;
; while ((*cq)->Next(&got_tag, &ok))
;
}
while (!contexts_.empty()) { while (!contexts_.empty()) {
delete contexts_.front(); delete contexts_.front();
contexts_.pop_front(); contexts_.pop_front();
...@@ -306,15 +312,8 @@ class AsyncQpsServerTest : public Server { ...@@ -306,15 +312,8 @@ class AsyncQpsServerTest : public Server {
} }
std::vector<std::thread> threads_; std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_; std::unique_ptr<grpc::Server> server_;
std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_; std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
TestService::AsyncService async_service_; TestService::AsyncService async_service_;
std::function<void(ServerContext *, SimpleRequest *,
grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
request_unary_;
std::function<void(
ServerContext *,
grpc::ServerAsyncReaderWriter<SimpleResponse, SimpleRequest> *, void *)>
request_streaming_;
std::forward_list<ServerRpcContext *> contexts_; std::forward_list<ServerRpcContext *> contexts_;
std::mutex shutdown_mutex_; std::mutex shutdown_mutex_;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment