Skip to content
Snippets Groups Projects
Commit 1a434052 authored by Craig Tiller's avatar Craig Tiller
Browse files

Merge pull request #6706 from ctiller/rraf

Round robin notifier pollset
parents d7bbd38b 15d7f3cd
No related branches found
No related tags found
No related merge requests found
...@@ -128,6 +128,9 @@ struct grpc_tcp_server { ...@@ -128,6 +128,9 @@ struct grpc_tcp_server {
grpc_pollset **pollsets; grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */ /* number of pollsets in the pollsets array */
size_t pollset_count; size_t pollset_count;
/* next pollset to assign a channel to */
size_t next_pollset_to_assign;
}; };
grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) { grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
...@@ -145,6 +148,7 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) { ...@@ -145,6 +148,7 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
s->head = NULL; s->head = NULL;
s->tail = NULL; s->tail = NULL;
s->nports = 0; s->nports = 0;
s->next_pollset_to_assign = 0;
return s; return s;
} }
...@@ -317,7 +321,9 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { ...@@ -317,7 +321,9 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
goto error; goto error;
} }
read_notifier_pollset = grpc_fd_get_read_notifier_pollset(exec_ctx, sp->emfd); read_notifier_pollset =
sp->server->pollsets[(sp->server->next_pollset_to_assign++) %
sp->server->pollset_count];
/* loop until accept4 returns EAGAIN, and then re-arm notification */ /* loop until accept4 returns EAGAIN, and then re-arm notification */
for (;;) { for (;;) {
......
...@@ -35,15 +35,15 @@ ...@@ -35,15 +35,15 @@
#include "server.h" #include "server.h"
#include <node.h>
#include <nan.h> #include <nan.h>
#include <node.h>
#include <vector> #include <vector>
#include "call.h"
#include "completion_queue_async_worker.h"
#include "grpc/grpc.h" #include "grpc/grpc.h"
#include "grpc/grpc_security.h" #include "grpc/grpc_security.h"
#include "grpc/support/log.h" #include "grpc/support/log.h"
#include "call.h"
#include "completion_queue_async_worker.h"
#include "server_credentials.h" #include "server_credentials.h"
#include "timeval.h" #include "timeval.h"
...@@ -100,8 +100,8 @@ class NewCallOp : public Op { ...@@ -100,8 +100,8 @@ class NewCallOp : public Op {
Nan::Set(obj, Nan::New("host").ToLocalChecked(), Nan::Set(obj, Nan::New("host").ToLocalChecked(),
Nan::New(details.host).ToLocalChecked()); Nan::New(details.host).ToLocalChecked());
Nan::Set(obj, Nan::New("deadline").ToLocalChecked(), Nan::Set(obj, Nan::New("deadline").ToLocalChecked(),
Nan::New<Date>( Nan::New<Date>(TimespecToMilliseconds(details.deadline))
TimespecToMilliseconds(details.deadline)).ToLocalChecked()); .ToLocalChecked());
Nan::Set(obj, Nan::New("metadata").ToLocalChecked(), Nan::Set(obj, Nan::New("metadata").ToLocalChecked(),
ParseMetadata(&request_metadata)); ParseMetadata(&request_metadata));
return scope.Escape(obj); return scope.Escape(obj);
...@@ -117,14 +117,13 @@ class NewCallOp : public Op { ...@@ -117,14 +117,13 @@ class NewCallOp : public Op {
grpc_metadata_array request_metadata; grpc_metadata_array request_metadata;
protected: protected:
std::string GetTypeString() const { std::string GetTypeString() const { return "new_call"; }
return "new_call";
}
}; };
Server::Server(grpc_server *server) : wrapped_server(server) { Server::Server(grpc_server *server) : wrapped_server(server) {
shutdown_queue = grpc_completion_queue_create(NULL); shutdown_queue = grpc_completion_queue_create(NULL);
grpc_server_register_completion_queue(server, shutdown_queue, NULL); grpc_server_register_non_listening_completion_queue(server, shutdown_queue,
NULL);
} }
Server::~Server() { Server::~Server() {
...@@ -156,8 +155,7 @@ bool Server::HasInstance(Local<Value> val) { ...@@ -156,8 +155,7 @@ bool Server::HasInstance(Local<Value> val) {
} }
void Server::ShutdownServer() { void Server::ShutdownServer() {
grpc_server_shutdown_and_notify(this->wrapped_server, grpc_server_shutdown_and_notify(this->wrapped_server, this->shutdown_queue,
this->shutdown_queue,
NULL); NULL);
grpc_server_cancel_all_calls(this->wrapped_server); grpc_server_cancel_all_calls(this->wrapped_server);
grpc_completion_queue_pluck(this->shutdown_queue, NULL, grpc_completion_queue_pluck(this->shutdown_queue, NULL,
...@@ -170,8 +168,8 @@ NAN_METHOD(Server::New) { ...@@ -170,8 +168,8 @@ NAN_METHOD(Server::New) {
if (!info.IsConstructCall()) { if (!info.IsConstructCall()) {
const int argc = 1; const int argc = 1;
Local<Value> argv[argc] = {info[0]}; Local<Value> argv[argc] = {info[0]};
MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance( MaybeLocal<Object> maybe_instance =
argc, argv); constructor->GetFunction()->NewInstance(argc, argv);
if (maybe_instance.IsEmpty()) { if (maybe_instance.IsEmpty()) {
// There's probably a pending exception // There's probably a pending exception
return; return;
...@@ -185,8 +183,9 @@ NAN_METHOD(Server::New) { ...@@ -185,8 +183,9 @@ NAN_METHOD(Server::New) {
grpc_channel_args *channel_args; grpc_channel_args *channel_args;
if (!ParseChannelArgs(info[0], &channel_args)) { if (!ParseChannelArgs(info[0], &channel_args)) {
DeallocateChannelArgs(channel_args); DeallocateChannelArgs(channel_args);
return Nan::ThrowTypeError("Server options must be an object with " return Nan::ThrowTypeError(
"string keys and integer or string values"); "Server options must be an object with "
"string keys and integer or string values");
} }
wrapped_server = grpc_server_create(channel_args, NULL); wrapped_server = grpc_server_create(channel_args, NULL);
DeallocateChannelArgs(channel_args); DeallocateChannelArgs(channel_args);
...@@ -218,8 +217,7 @@ NAN_METHOD(Server::RequestCall) { ...@@ -218,8 +217,7 @@ NAN_METHOD(Server::RequestCall) {
NAN_METHOD(Server::AddHttp2Port) { NAN_METHOD(Server::AddHttp2Port) {
if (!HasInstance(info.This())) { if (!HasInstance(info.This())) {
return Nan::ThrowTypeError( return Nan::ThrowTypeError("addHttp2Port can only be called on a Server");
"addHttp2Port can only be called on a Server");
} }
if (!info[0]->IsString()) { if (!info[0]->IsString()) {
return Nan::ThrowTypeError( return Nan::ThrowTypeError(
...@@ -239,8 +237,7 @@ NAN_METHOD(Server::AddHttp2Port) { ...@@ -239,8 +237,7 @@ NAN_METHOD(Server::AddHttp2Port) {
*Utf8String(info[0])); *Utf8String(info[0]));
} else { } else {
port = grpc_server_add_secure_http2_port(server->wrapped_server, port = grpc_server_add_secure_http2_port(server->wrapped_server,
*Utf8String(info[0]), *Utf8String(info[0]), creds);
creds);
} }
info.GetReturnValue().Set(Nan::New<Number>(port)); info.GetReturnValue().Set(Nan::New<Number>(port));
} }
...@@ -262,8 +259,7 @@ NAN_METHOD(Server::TryShutdown) { ...@@ -262,8 +259,7 @@ NAN_METHOD(Server::TryShutdown) {
Server *server = ObjectWrap::Unwrap<Server>(info.This()); Server *server = ObjectWrap::Unwrap<Server>(info.This());
unique_ptr<OpVec> ops(new OpVec()); unique_ptr<OpVec> ops(new OpVec());
grpc_server_shutdown_and_notify( grpc_server_shutdown_and_notify(
server->wrapped_server, server->wrapped_server, CompletionQueueAsyncWorker::GetQueue(),
CompletionQueueAsyncWorker::GetQueue(),
new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(), new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(),
shared_ptr<Resources>(nullptr))); shared_ptr<Resources>(nullptr)));
CompletionQueueAsyncWorker::Next(); CompletionQueueAsyncWorker::Next();
......
...@@ -336,6 +336,8 @@ cdef extern from "grpc/_cython/loader.h": ...@@ -336,6 +336,8 @@ cdef extern from "grpc/_cython/loader.h":
void grpc_server_register_completion_queue(grpc_server *server, void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq, grpc_completion_queue *cq,
void *reserved) nogil void *reserved) nogil
void grpc_server_register_non_listening_completion_queue(
grpc_server *server, grpc_completion_queue *cq, void *reserved) nogil
int grpc_server_add_insecure_http2_port( int grpc_server_add_insecure_http2_port(
grpc_server *server, const char *addr) nogil grpc_server *server, const char *addr) nogil
void grpc_server_start(grpc_server *server) nogil void grpc_server_start(grpc_server *server) nogil
......
...@@ -81,11 +81,20 @@ cdef class Server: ...@@ -81,11 +81,20 @@ cdef class Server:
self.c_server, queue.c_completion_queue, NULL) self.c_server, queue.c_completion_queue, NULL)
self.registered_completion_queues.append(queue) self.registered_completion_queues.append(queue)
def register_non_listening_completion_queue(
self, CompletionQueue queue not None):
if self.is_started:
raise ValueError("cannot register completion queues after start")
with nogil:
grpc_server_register_non_listening_completion_queue(
self.c_server, queue.c_completion_queue, NULL)
self.registered_completion_queues.append(queue)
def start(self): def start(self):
if self.is_started: if self.is_started:
raise ValueError("the server has already started") raise ValueError("the server has already started")
self.backup_shutdown_queue = CompletionQueue() self.backup_shutdown_queue = CompletionQueue()
self.register_completion_queue(self.backup_shutdown_queue) self.register_non_listening_completion_queue(self.backup_shutdown_queue)
self.is_started = True self.is_started = True
with nogil: with nogil:
grpc_server_start(self.c_server) grpc_server_start(self.c_server)
...@@ -169,4 +178,3 @@ cdef class Server: ...@@ -169,4 +178,3 @@ cdef class Server:
time.sleep(0) time.sleep(0)
with nogil: with nogil:
grpc_server_destroy(self.c_server) grpc_server_destroy(self.c_server)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment