diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 5b4cb6f21472768d07bdbe2f5fbbb34a5553902a..bae83eee3f5bed049064c6721b7a30aee274609e 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -135,10 +135,13 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { /// \param max_pollers The maximum number of polling threads per server /// completion queue (in param sync_server_cqs) to use for listening to /// incoming requests (used only in case of sync server) + /// + /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on + /// server completion queues passed via sync_server_cqs param. Server(std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> sync_server_cqs, int max_message_size, ChannelArguments* args, int min_pollers, - int max_pollers); + int max_pollers, int sync_cq_timeout_msec); /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index d9a6878317706491ca8708d0ce589d989bcce44d..8fac168ff7e5de96119622222dd80687064a8963 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -62,6 +62,22 @@ class ServerBuilder { public: ServerBuilder(); + struct SyncServerSettings { + // Number of server completion queues to create to listen to incoming RPCs. + int num_cqs; + + // Minimum number of threads per completion queue that should be listening + // to incoming RPCs. + int min_pollers; + + // Maximum number of threads per completion queue that can be listening to + // incoming RPCs. + int max_pollers; + + // The timeout for server completion queue's AsyncNext call. + int cq_timeout_msec; + }; + /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the \a Server instance returned /// by \a BuildAndStart(). @@ -115,6 +131,9 @@ class ServerBuilder { ServerBuilder& SetOption(std::unique_ptr<ServerBuilderOption> option); + /// Note: Only useful if this is a Synchronous server. + void SetSyncServerSettings(SyncServerSettings settings); + /// Tries to bind \a server to the given \a addr. /// /// It can be invoked multiple times. @@ -164,18 +183,20 @@ class ServerBuilder { private: friend class ::grpc::testing::ServerBuilderPluginTest; - // TODO (sreek) Make these configurable - // The default number of minimum and maximum number of polling threads needed - // per completion queue. These are only used in case of Sync server - const int kDefaultMinPollers = 1; - const int kDefaultMaxPollers = -1; // Unlimited - struct Port { grpc::string addr; std::shared_ptr<ServerCredentials> creds; int* selected_port; }; + // Sync server settings. If this is not set via SetSyncServerSettings(), the + // following default values are used: + // sync_server_settings_.num_cqs = Number of CPUs + // sync_server_settings_.min_pollers = 1 + // sync_server_settings_.max_pollers = INT_MAX + // sync_server_settings_.cq_timeout_msec = 1000 + struct SyncServerSettings sync_server_settings_; + typedef std::unique_ptr<grpc::string> HostString; struct NamedService { explicit NamedService(Service* s) : service(s) {} diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index eab57b4ac3294cc23350cf3492bb6b6a7469574c..1a27100be5faf010fc6dc8d899c59f78fa272827 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -62,6 +62,7 @@ ServerBuilder::ServerBuilder() auto& factory = *it; plugins_.emplace_back(factory()); } + // all compression algorithms enabled by default. enabled_compression_algorithms_bitset_ = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; @@ -69,6 +70,17 @@ ServerBuilder::ServerBuilder() sizeof(maybe_default_compression_level_)); memset(&maybe_default_compression_algorithm_, 0, sizeof(maybe_default_compression_algorithm_)); + + + // Sync server setting defaults + sync_server_settings_.min_pollers = 1; + sync_server_settings_.max_pollers = INT_MAX; + + int num_cpus = gpr_cpu_num_cores(); + num_cpus = GPR_MAX(num_cpus, 4); + sync_server_settings_.num_cqs = num_cpus; + + sync_server_settings_.cq_timeout_msec = 1000; } std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue( @@ -131,6 +143,10 @@ ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm( return *this; } +void ServerBuilder:: SetSyncServerSettings(SyncServerSettings settings) { + sync_server_settings_ = settings; // copy the settings +} + ServerBuilder& ServerBuilder::AddListeningPort( const grpc::string& addr, std::shared_ptr<ServerCredentials> creds, int* selected_port) { @@ -200,23 +216,17 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { if (has_sync_methods) { // If the server has synchronous methods, it will need completion queues to - // handle those methods. Create one cq per core (or create 4 if number of - // cores is less than 4 or unavailable) - // - // TODO (sreek) - The default number 4 is just a guess. Check if a lower or - // higher number makes sense - int num_cqs = gpr_cpu_num_cores(); - num_cqs = GPR_MAX(num_cqs, 4); - - for (int i = 0; i < num_cqs; i++) { + // handle those methods. + for (int i = 0; i < sync_server_settings_.num_cqs; i++) { sync_server_cqs->emplace_back(new ServerCompletionQueue()); } } // TODO (sreek) Make the number of pollers configurable - std::unique_ptr<Server> server( - new Server(sync_server_cqs, max_receive_message_size_, &args, - kDefaultMinPollers, kDefaultMaxPollers)); + std::unique_ptr<Server> server(new Server( + sync_server_cqs, max_receive_message_size_, &args, + sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec)); ServerInitializer* initializer = server->initializer(); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index e9f3c9978865db627edddf27b4f7ba3e480ded52..36bc61fdf1b88030b90f9045d05bccc43304aedf 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -291,18 +291,17 @@ class Server::SyncRequestManager : public GrpcRpcManager { public: SyncRequestManager(Server* server, CompletionQueue* server_cq, std::shared_ptr<GlobalCallbacks> global_callbacks, - int min_pollers, int max_pollers) + int min_pollers, int max_pollers, int cq_timeout_msec) : GrpcRpcManager(min_pollers, max_pollers), server_(server), server_cq_(server_cq), + cq_timeout_msec_(cq_timeout_msec), global_callbacks_(global_callbacks) {} - static const int kRpcPollingTimeoutMsec = 3000; - WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE { *tag = nullptr; gpr_timespec deadline = - gpr_time_from_millis(kRpcPollingTimeoutMsec, GPR_TIMESPAN); + gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN); switch (server_cq_->AsyncNext(tag, ok, deadline)) { case CompletionQueue::TIMEOUT: @@ -389,6 +388,7 @@ class Server::SyncRequestManager : public GrpcRpcManager { private: Server* server_; CompletionQueue* server_cq_; + int cq_timeout_msec_; std::vector<SyncRequest> sync_methods_; std::unique_ptr<RpcServiceMethod> unknown_method_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; @@ -399,7 +399,7 @@ Server::Server( std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> sync_server_cqs, int max_receive_message_size, ChannelArguments* args, int min_pollers, - int max_pollers) + int max_pollers, int sync_cq_timeout_msec) : max_receive_message_size_(max_receive_message_size), sync_server_cqs_(sync_server_cqs), started_(false), @@ -415,8 +415,9 @@ Server::Server( for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); it++) { - sync_req_mgrs_.emplace_back(new SyncRequestManager( - this, (*it).get(), global_callbacks_, min_pollers, max_pollers)); + sync_req_mgrs_.emplace_back( + new SyncRequestManager(this, (*it).get(), global_callbacks_, + min_pollers, max_pollers, sync_cq_timeout_msec)); } grpc_channel_args channel_args; @@ -606,7 +607,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Drain the shutdown queue (if the previous call to AsyncNext() timed out // and we didn't remove the tag from the queue yet) - while(shutdown_cq.Next(&tag, &ok)) { + while (shutdown_cq.Next(&tag, &ok)) { // Nothing to be done here } diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index b1d3ce92f6aa8f4a3bb99c805603ef162832025f..a46f9f268bc6bfa74d6eb94fddb51fa6c63e64a1 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -226,6 +226,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { kMaxMessageSize_(8192), special_service_("special") { GetParam().Log(); + + sync_server_settings_.max_pollers = INT_MAX; + sync_server_settings_.min_pollers = 1; + sync_server_settings_.cq_timeout_msec = 10; + sync_server_settings_.num_cqs = 4; } void TearDown() GRPC_OVERRIDE { @@ -250,6 +255,9 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { builder.SetMaxMessageSize( kMaxMessageSize_); // For testing max message size. builder.RegisterService(&dup_pkg_service_); + + builder.SetSyncServerSettings(sync_server_settings_); + server_ = builder.BuildAndStart(); is_server_started_ = true; } @@ -279,6 +287,8 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { ServerBuilder builder; builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials()); builder.RegisterService(proxy_service_.get()); + builder.SetSyncServerSettings(sync_server_settings_); + proxy_server_ = builder.BuildAndStart(); channel_ = CreateChannel(proxyaddr.str(), InsecureChannelCredentials()); @@ -299,6 +309,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { TestServiceImpl special_service_; TestServiceImplDupPkg dup_pkg_service_; grpc::string user_agent_prefix_; + ServerBuilder::SyncServerSettings sync_server_settings_; }; static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,