diff --git a/include/grpc++/server.h b/include/grpc++/server.h index b02c4130d9c8e96f3ea7e5c78e06d3a50c5dc4f8..daa3f0a6617a8c9aa1e23f64195facfa6aefc7a0 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -87,8 +87,7 @@ class Server { void ScheduleCallback(); // Completion queue. - std::unique_ptr<CompletionQueue> cq_sync_; - std::unique_ptr<CompletionQueue> cq_async_; + CompletionQueue cq_; // Sever status std::mutex mu_; diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 7733f8bb2aede5bd09110390a9998e692dcf0fac..561bc9a5a951504fcbebe3812ab22637a82a5477 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -553,7 +553,6 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server, grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, - grpc_completion_queue *cq_when_rpc_available, grpc_completion_queue *cq_bound_to_call, void *tag_new); @@ -564,7 +563,6 @@ grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *registered_method, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *request_metadata, grpc_byte_buffer **optional_payload, - grpc_completion_queue *cq_when_rpc_available, grpc_completion_queue *cq_bound_to_call, void *tag_new); /* Create a server */ diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 93994e6bdda8014a9fc8305fbd4949b70d57889e..c5f49a091ece9ade85e2ad693e803987073f0b73 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -74,14 +74,12 @@ typedef struct { void *tag; union { struct { - grpc_completion_queue *cq_new; grpc_completion_queue *cq_bind; grpc_call **call; grpc_call_details *details; grpc_metadata_array *initial_metadata; } batch; struct { - grpc_completion_queue *cq_new; grpc_completion_queue *cq_bind; grpc_call **call; registered_method *registered_method; @@ -174,8 +172,6 @@ struct call_data { call_data **root[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; - - grpc_completion_queue *cq_new; }; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -187,8 +183,7 @@ static void begin_call(grpc_server *server, call_data *calld, requested_call *rc); static void fail_call(grpc_server *server, requested_call *rc); -static int call_list_join(call_data **root, call_data *call, - call_list list) { +static int call_list_join(call_data **root, call_data *call, call_list list) { GPR_ASSERT(!call->root[list]); call->root[list] = root; if (!*root) { @@ -290,7 +285,10 @@ static void destroy_channel(channel_data *chand) { grpc_iomgr_add_callback(finish_destroy_channel, chand); } -static void finish_start_new_rpc_and_unlock(grpc_server *server, grpc_call_element *elem, call_data **pending_root, requested_call_array *array) { +static void finish_start_new_rpc_and_unlock(grpc_server *server, + grpc_call_element *elem, + call_data **pending_root, + requested_call_array *array) { requested_call rc; call_data *calld = elem->call_data; if (array->count == 0) { @@ -318,25 +316,32 @@ static void start_new_rpc(grpc_call_element *elem) { /* check for an exact match with host */ hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash); for (i = 0; i < chand->registered_method_max_probes; i++) { - rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots]; + rm = &chand->registered_methods[(hash + i) % + chand->registered_method_slots]; if (!rm) break; if (rm->host != calld->host) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested); + finish_start_new_rpc_and_unlock(server, elem, + &rm->server_registered_method->pending, + &rm->server_registered_method->requested); return; } /* check for a wildcard method definition (no host set) */ hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash); for (i = 0; i < chand->registered_method_max_probes; i++) { - rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots]; + rm = &chand->registered_methods[(hash + i) % + chand->registered_method_slots]; if (!rm) break; if (rm->host != NULL) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested); + finish_start_new_rpc_and_unlock(server, elem, + &rm->server_registered_method->pending, + &rm->server_registered_method->requested); return; } } - finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], &server->requested_calls); + finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], + &server->requested_calls); } static void kill_zombie(void *elem, int success) { @@ -684,7 +689,10 @@ grpc_transport_setup_result grpc_server_setup_transport( host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL; method = grpc_mdstr_from_string(mdctx, rm->host); hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash); - for (probes = 0; chand->registered_methods[(hash + probes) % slots].server_registered_method != NULL; probes++); + for (probes = 0; chand->registered_methods[(hash + probes) % slots] + .server_registered_method != NULL; + probes++) + ; if (probes > max_probes) max_probes = probes; crm = &chand->registered_methods[(hash + probes) % slots]; crm->server_registered_method = rm; @@ -829,10 +837,12 @@ static grpc_call_error queue_call_request(grpc_server *server, switch (rc->type) { case LEGACY_CALL: case BATCH_CALL: - calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START); + calld = + call_list_remove_head(&server->lists[PENDING_START], PENDING_START); break; case REGISTERED_CALL: - calld = call_list_remove_head(&rc->data.registered.registered_method->pending, PENDING_START); + calld = call_list_remove_head( + &rc->data.registered.registered_method->pending, PENDING_START); break; } if (calld) { @@ -851,13 +861,12 @@ static grpc_call_error queue_call_request(grpc_server *server, grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *initial_metadata, - grpc_completion_queue *cq_new, - grpc_completion_queue *cq_bind, void *tag) { + grpc_completion_queue *cq_bind, + void *tag) { requested_call rc; - grpc_cq_begin_op(cq_new, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE); rc.type = BATCH_CALL; rc.tag = tag; - rc.data.batch.cq_new = cq_new; rc.data.batch.cq_bind = cq_bind; rc.data.batch.call = call; rc.data.batch.details = details; @@ -868,13 +877,12 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *registered_method, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *initial_metadata, - grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_new, grpc_completion_queue *cq_bind, + grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind, void *tag) { requested_call rc; - grpc_cq_begin_op(cq_new, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE); rc.type = REGISTERED_CALL; rc.tag = tag; - rc.data.registered.cq_new = cq_new; rc.data.registered.cq_bind = cq_bind; rc.data.registered.call = call; rc.data.registered.registered_method = registered_method; @@ -896,7 +904,8 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server, static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag); static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, void *tag); -static void publish_was_not_set(grpc_call *call, grpc_op_error status, void *tag) { +static void publish_was_not_set(grpc_call *call, grpc_op_error status, + void *tag) { abort(); } @@ -942,7 +951,6 @@ static void begin_call(grpc_server *server, call_data *calld, r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.batch.initial_metadata; r++; - calld->cq_new = rc->data.batch.cq_new; publish = publish_registered_or_batch; break; case REGISTERED_CALL: @@ -957,7 +965,6 @@ static void begin_call(grpc_server *server, call_data *calld, r->data.recv_message = rc->data.registered.optional_payload; r++; } - calld->cq_new = rc->data.registered.cq_new; publish = publish_registered_or_batch; break; } @@ -976,14 +983,14 @@ static void fail_call(grpc_server *server, requested_call *rc) { case BATCH_CALL: *rc->data.batch.call = NULL; rc->data.batch.initial_metadata->count = 0; - grpc_cq_end_op_complete(rc->data.batch.cq_new, rc->tag, NULL, do_nothing, - NULL, GRPC_OP_ERROR); + grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL, + GRPC_OP_ERROR); break; case REGISTERED_CALL: *rc->data.registered.call = NULL; rc->data.registered.initial_metadata->count = 0; - grpc_cq_end_op_complete(rc->data.registered.cq_new, rc->tag, NULL, do_nothing, - NULL, GRPC_OP_ERROR); + grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL, + GRPC_OP_ERROR); break; } } @@ -1011,9 +1018,9 @@ static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, void *tag) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - call_data *calld = elem->call_data; - grpc_cq_end_op_complete(calld->cq_new, tag, call, - do_nothing, NULL, status); + channel_data *chand = elem->channel_data; + grpc_server *server = chand->server; + grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, status); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 6d014a55f36406fb0a3f59b5f1920db66544d05b..5f59a382c563a1b86177f52e2d2f95709bab44b1 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -56,9 +56,9 @@ Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, thread_pool_owned_(thread_pool_owned), secure_(creds != nullptr) { if (creds) { - server_ = grpc_secure_server_create(creds->GetRawCreds(), nullptr, nullptr); + server_ = grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr); } else { - server_ = grpc_server_create(nullptr, nullptr); + server_ = grpc_server_create(cq_.cq(), nullptr); } } @@ -82,9 +82,6 @@ Server::~Server() { } bool Server::RegisterService(RpcService *service) { - if (!cq_sync_) { - cq_sync_.reset(new CompletionQueue); - } for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod *method = service->GetMethod(i); void *tag = grpc_server_register_method(server_, method->name(), nullptr); @@ -131,14 +128,14 @@ class Server::MethodRequestData final : public CompletionQueueTag { return mrd; } - void Request(grpc_server *server, CompletionQueue *cq) { + void Request(grpc_server *server) { GPR_ASSERT(!in_flight_); in_flight_ = true; cq_ = grpc_completion_queue_create(); GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call( server, tag_, &call_, &deadline_, &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, cq->cq(), + has_request_payload_ ? &request_payload_ : nullptr, cq_, this)); } @@ -212,9 +209,9 @@ bool Server::Start() { grpc_server_start(server_); // Start processing rpcs. - if (cq_sync_) { + if (!methods_.empty()) { for (auto &m : methods_) { - m.Request(server_, cq_sync_.get()); + m.Request(server_); } ScheduleCallback(); @@ -249,12 +246,12 @@ void Server::ScheduleCallback() { void Server::RunRpc() { // Wait for one more incoming rpc. bool ok; - auto *mrd = MethodRequestData::Wait(cq_sync_.get(), &ok); + auto *mrd = MethodRequestData::Wait(&cq_, &ok); if (mrd) { MethodRequestData::CallData cd(mrd); if (ok) { - mrd->Request(server_, cq_sync_.get()); + mrd->Request(server_); ScheduleCallback(); cd.Run();