Skip to content
Snippets Groups Projects
Commit 3b29b566 authored by Craig Tiller's avatar Craig Tiller
Browse files

Just use one completion queue per server for delivering completions

This simplifies (drastically) the polling story, although will slightly
complicate mixing sync & async servers - but we're not there yet.
parent 06ed31e9
No related branches found
No related tags found
No related merge requests found
...@@ -87,8 +87,7 @@ class Server { ...@@ -87,8 +87,7 @@ class Server {
void ScheduleCallback(); void ScheduleCallback();
// Completion queue. // Completion queue.
std::unique_ptr<CompletionQueue> cq_sync_; CompletionQueue cq_;
std::unique_ptr<CompletionQueue> cq_async_;
// Sever status // Sever status
std::mutex mu_; std::mutex mu_;
......
...@@ -553,7 +553,6 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server, ...@@ -553,7 +553,6 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server,
grpc_call_error grpc_server_request_call( grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata, grpc_metadata_array *request_metadata,
grpc_completion_queue *cq_when_rpc_available,
grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_bound_to_call,
void *tag_new); void *tag_new);
...@@ -564,7 +563,6 @@ grpc_call_error grpc_server_request_registered_call( ...@@ -564,7 +563,6 @@ grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *registered_method, grpc_call **call, grpc_server *server, void *registered_method, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *request_metadata, gpr_timespec *deadline, grpc_metadata_array *request_metadata,
grpc_byte_buffer **optional_payload, grpc_byte_buffer **optional_payload,
grpc_completion_queue *cq_when_rpc_available,
grpc_completion_queue *cq_bound_to_call, void *tag_new); grpc_completion_queue *cq_bound_to_call, void *tag_new);
/* Create a server */ /* Create a server */
......
...@@ -74,14 +74,12 @@ typedef struct { ...@@ -74,14 +74,12 @@ typedef struct {
void *tag; void *tag;
union { union {
struct { struct {
grpc_completion_queue *cq_new;
grpc_completion_queue *cq_bind; grpc_completion_queue *cq_bind;
grpc_call **call; grpc_call **call;
grpc_call_details *details; grpc_call_details *details;
grpc_metadata_array *initial_metadata; grpc_metadata_array *initial_metadata;
} batch; } batch;
struct { struct {
grpc_completion_queue *cq_new;
grpc_completion_queue *cq_bind; grpc_completion_queue *cq_bind;
grpc_call **call; grpc_call **call;
registered_method *registered_method; registered_method *registered_method;
...@@ -174,8 +172,6 @@ struct call_data { ...@@ -174,8 +172,6 @@ struct call_data {
call_data **root[CALL_LIST_COUNT]; call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT];
grpc_completion_queue *cq_new;
}; };
#define SERVER_FROM_CALL_ELEM(elem) \ #define SERVER_FROM_CALL_ELEM(elem) \
...@@ -187,8 +183,7 @@ static void begin_call(grpc_server *server, call_data *calld, ...@@ -187,8 +183,7 @@ static void begin_call(grpc_server *server, call_data *calld,
requested_call *rc); requested_call *rc);
static void fail_call(grpc_server *server, requested_call *rc); static void fail_call(grpc_server *server, requested_call *rc);
static int call_list_join(call_data **root, call_data *call, static int call_list_join(call_data **root, call_data *call, call_list list) {
call_list list) {
GPR_ASSERT(!call->root[list]); GPR_ASSERT(!call->root[list]);
call->root[list] = root; call->root[list] = root;
if (!*root) { if (!*root) {
...@@ -290,7 +285,10 @@ static void destroy_channel(channel_data *chand) { ...@@ -290,7 +285,10 @@ static void destroy_channel(channel_data *chand) {
grpc_iomgr_add_callback(finish_destroy_channel, chand); grpc_iomgr_add_callback(finish_destroy_channel, chand);
} }
static void finish_start_new_rpc_and_unlock(grpc_server *server, grpc_call_element *elem, call_data **pending_root, requested_call_array *array) { static void finish_start_new_rpc_and_unlock(grpc_server *server,
grpc_call_element *elem,
call_data **pending_root,
requested_call_array *array) {
requested_call rc; requested_call rc;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
if (array->count == 0) { if (array->count == 0) {
...@@ -318,25 +316,32 @@ static void start_new_rpc(grpc_call_element *elem) { ...@@ -318,25 +316,32 @@ static void start_new_rpc(grpc_call_element *elem) {
/* check for an exact match with host */ /* check for an exact match with host */
hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash); hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
for (i = 0; i < chand->registered_method_max_probes; i++) { for (i = 0; i < chand->registered_method_max_probes; i++) {
rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots]; rm = &chand->registered_methods[(hash + i) %
chand->registered_method_slots];
if (!rm) break; if (!rm) break;
if (rm->host != calld->host) continue; if (rm->host != calld->host) continue;
if (rm->method != calld->path) continue; if (rm->method != calld->path) continue;
finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested); finish_start_new_rpc_and_unlock(server, elem,
&rm->server_registered_method->pending,
&rm->server_registered_method->requested);
return; return;
} }
/* check for a wildcard method definition (no host set) */ /* check for a wildcard method definition (no host set) */
hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash); hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
for (i = 0; i < chand->registered_method_max_probes; i++) { for (i = 0; i < chand->registered_method_max_probes; i++) {
rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots]; rm = &chand->registered_methods[(hash + i) %
chand->registered_method_slots];
if (!rm) break; if (!rm) break;
if (rm->host != NULL) continue; if (rm->host != NULL) continue;
if (rm->method != calld->path) continue; if (rm->method != calld->path) continue;
finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested); finish_start_new_rpc_and_unlock(server, elem,
&rm->server_registered_method->pending,
&rm->server_registered_method->requested);
return; return;
} }
} }
finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], &server->requested_calls); finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
&server->requested_calls);
} }
static void kill_zombie(void *elem, int success) { static void kill_zombie(void *elem, int success) {
...@@ -684,7 +689,10 @@ grpc_transport_setup_result grpc_server_setup_transport( ...@@ -684,7 +689,10 @@ grpc_transport_setup_result grpc_server_setup_transport(
host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL; host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
method = grpc_mdstr_from_string(mdctx, rm->host); method = grpc_mdstr_from_string(mdctx, rm->host);
hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash); hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
for (probes = 0; chand->registered_methods[(hash + probes) % slots].server_registered_method != NULL; probes++); for (probes = 0; chand->registered_methods[(hash + probes) % slots]
.server_registered_method != NULL;
probes++)
;
if (probes > max_probes) max_probes = probes; if (probes > max_probes) max_probes = probes;
crm = &chand->registered_methods[(hash + probes) % slots]; crm = &chand->registered_methods[(hash + probes) % slots];
crm->server_registered_method = rm; crm->server_registered_method = rm;
...@@ -829,10 +837,12 @@ static grpc_call_error queue_call_request(grpc_server *server, ...@@ -829,10 +837,12 @@ static grpc_call_error queue_call_request(grpc_server *server,
switch (rc->type) { switch (rc->type) {
case LEGACY_CALL: case LEGACY_CALL:
case BATCH_CALL: case BATCH_CALL:
calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START); calld =
call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
break; break;
case REGISTERED_CALL: case REGISTERED_CALL:
calld = call_list_remove_head(&rc->data.registered.registered_method->pending, PENDING_START); calld = call_list_remove_head(
&rc->data.registered.registered_method->pending, PENDING_START);
break; break;
} }
if (calld) { if (calld) {
...@@ -851,13 +861,12 @@ static grpc_call_error queue_call_request(grpc_server *server, ...@@ -851,13 +861,12 @@ static grpc_call_error queue_call_request(grpc_server *server,
grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_call_details *details, grpc_call_details *details,
grpc_metadata_array *initial_metadata, grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq_new, grpc_completion_queue *cq_bind,
grpc_completion_queue *cq_bind, void *tag) { void *tag) {
requested_call rc; requested_call rc;
grpc_cq_begin_op(cq_new, NULL, GRPC_OP_COMPLETE); grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE);
rc.type = BATCH_CALL; rc.type = BATCH_CALL;
rc.tag = tag; rc.tag = tag;
rc.data.batch.cq_new = cq_new;
rc.data.batch.cq_bind = cq_bind; rc.data.batch.cq_bind = cq_bind;
rc.data.batch.call = call; rc.data.batch.call = call;
rc.data.batch.details = details; rc.data.batch.details = details;
...@@ -868,13 +877,12 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, ...@@ -868,13 +877,12 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_call_error grpc_server_request_registered_call( grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *registered_method, grpc_call **call, grpc_server *server, void *registered_method, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *initial_metadata, gpr_timespec *deadline, grpc_metadata_array *initial_metadata,
grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_new, grpc_completion_queue *cq_bind, grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind,
void *tag) { void *tag) {
requested_call rc; requested_call rc;
grpc_cq_begin_op(cq_new, NULL, GRPC_OP_COMPLETE); grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE);
rc.type = REGISTERED_CALL; rc.type = REGISTERED_CALL;
rc.tag = tag; rc.tag = tag;
rc.data.registered.cq_new = cq_new;
rc.data.registered.cq_bind = cq_bind; rc.data.registered.cq_bind = cq_bind;
rc.data.registered.call = call; rc.data.registered.call = call;
rc.data.registered.registered_method = registered_method; rc.data.registered.registered_method = registered_method;
...@@ -896,7 +904,8 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server, ...@@ -896,7 +904,8 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server,
static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag); static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag); void *tag);
static void publish_was_not_set(grpc_call *call, grpc_op_error status, void *tag) { static void publish_was_not_set(grpc_call *call, grpc_op_error status,
void *tag) {
abort(); abort();
} }
...@@ -942,7 +951,6 @@ static void begin_call(grpc_server *server, call_data *calld, ...@@ -942,7 +951,6 @@ static void begin_call(grpc_server *server, call_data *calld,
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.batch.initial_metadata; r->data.recv_metadata = rc->data.batch.initial_metadata;
r++; r++;
calld->cq_new = rc->data.batch.cq_new;
publish = publish_registered_or_batch; publish = publish_registered_or_batch;
break; break;
case REGISTERED_CALL: case REGISTERED_CALL:
...@@ -957,7 +965,6 @@ static void begin_call(grpc_server *server, call_data *calld, ...@@ -957,7 +965,6 @@ static void begin_call(grpc_server *server, call_data *calld,
r->data.recv_message = rc->data.registered.optional_payload; r->data.recv_message = rc->data.registered.optional_payload;
r++; r++;
} }
calld->cq_new = rc->data.registered.cq_new;
publish = publish_registered_or_batch; publish = publish_registered_or_batch;
break; break;
} }
...@@ -976,14 +983,14 @@ static void fail_call(grpc_server *server, requested_call *rc) { ...@@ -976,14 +983,14 @@ static void fail_call(grpc_server *server, requested_call *rc) {
case BATCH_CALL: case BATCH_CALL:
*rc->data.batch.call = NULL; *rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0; rc->data.batch.initial_metadata->count = 0;
grpc_cq_end_op_complete(rc->data.batch.cq_new, rc->tag, NULL, do_nothing, grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL,
NULL, GRPC_OP_ERROR); GRPC_OP_ERROR);
break; break;
case REGISTERED_CALL: case REGISTERED_CALL:
*rc->data.registered.call = NULL; *rc->data.registered.call = NULL;
rc->data.registered.initial_metadata->count = 0; rc->data.registered.initial_metadata->count = 0;
grpc_cq_end_op_complete(rc->data.registered.cq_new, rc->tag, NULL, do_nothing, grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL,
NULL, GRPC_OP_ERROR); GRPC_OP_ERROR);
break; break;
} }
} }
...@@ -1011,9 +1018,9 @@ static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, ...@@ -1011,9 +1018,9 @@ static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag) { void *tag) {
grpc_call_element *elem = grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0); grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
call_data *calld = elem->call_data; channel_data *chand = elem->channel_data;
grpc_cq_end_op_complete(calld->cq_new, tag, call, grpc_server *server = chand->server;
do_nothing, NULL, status); grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, status);
} }
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
......
...@@ -56,9 +56,9 @@ Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, ...@@ -56,9 +56,9 @@ Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned,
thread_pool_owned_(thread_pool_owned), thread_pool_owned_(thread_pool_owned),
secure_(creds != nullptr) { secure_(creds != nullptr) {
if (creds) { if (creds) {
server_ = grpc_secure_server_create(creds->GetRawCreds(), nullptr, nullptr); server_ = grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr);
} else { } else {
server_ = grpc_server_create(nullptr, nullptr); server_ = grpc_server_create(cq_.cq(), nullptr);
} }
} }
...@@ -82,9 +82,6 @@ Server::~Server() { ...@@ -82,9 +82,6 @@ Server::~Server() {
} }
bool Server::RegisterService(RpcService *service) { bool Server::RegisterService(RpcService *service) {
if (!cq_sync_) {
cq_sync_.reset(new CompletionQueue);
}
for (int i = 0; i < service->GetMethodCount(); ++i) { for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod *method = service->GetMethod(i); RpcServiceMethod *method = service->GetMethod(i);
void *tag = grpc_server_register_method(server_, method->name(), nullptr); void *tag = grpc_server_register_method(server_, method->name(), nullptr);
...@@ -131,14 +128,14 @@ class Server::MethodRequestData final : public CompletionQueueTag { ...@@ -131,14 +128,14 @@ class Server::MethodRequestData final : public CompletionQueueTag {
return mrd; return mrd;
} }
void Request(grpc_server *server, CompletionQueue *cq) { void Request(grpc_server *server) {
GPR_ASSERT(!in_flight_); GPR_ASSERT(!in_flight_);
in_flight_ = true; in_flight_ = true;
cq_ = grpc_completion_queue_create(); cq_ = grpc_completion_queue_create();
GPR_ASSERT(GRPC_CALL_OK == GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_registered_call( grpc_server_request_registered_call(
server, tag_, &call_, &deadline_, &request_metadata_, server, tag_, &call_, &deadline_, &request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq->cq(), has_request_payload_ ? &request_payload_ : nullptr,
cq_, this)); cq_, this));
} }
...@@ -212,9 +209,9 @@ bool Server::Start() { ...@@ -212,9 +209,9 @@ bool Server::Start() {
grpc_server_start(server_); grpc_server_start(server_);
// Start processing rpcs. // Start processing rpcs.
if (cq_sync_) { if (!methods_.empty()) {
for (auto &m : methods_) { for (auto &m : methods_) {
m.Request(server_, cq_sync_.get()); m.Request(server_);
} }
ScheduleCallback(); ScheduleCallback();
...@@ -249,12 +246,12 @@ void Server::ScheduleCallback() { ...@@ -249,12 +246,12 @@ void Server::ScheduleCallback() {
void Server::RunRpc() { void Server::RunRpc() {
// Wait for one more incoming rpc. // Wait for one more incoming rpc.
bool ok; bool ok;
auto *mrd = MethodRequestData::Wait(cq_sync_.get(), &ok); auto *mrd = MethodRequestData::Wait(&cq_, &ok);
if (mrd) { if (mrd) {
MethodRequestData::CallData cd(mrd); MethodRequestData::CallData cd(mrd);
if (ok) { if (ok) {
mrd->Request(server_, cq_sync_.get()); mrd->Request(server_);
ScheduleCallback(); ScheduleCallback();
cd.Run(); cd.Run();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment