Skip to content
Snippets Groups Projects
Commit 892dbf4d authored by Sree Kuchibhotla's avatar Sree Kuchibhotla
Browse files

Make settings configurable

parent 6b403331
No related branches found
No related tags found
No related merge requests found
......@@ -135,10 +135,13 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
/// \param max_pollers The maximum number of polling threads per server
/// completion queue (in param sync_server_cqs) to use for listening to
/// incoming requests (used only in case of sync server)
///
/// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on
/// server completion queues passed via sync_server_cqs param.
Server(std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
int max_message_size, ChannelArguments* args, int min_pollers,
int max_pollers);
int max_pollers, int sync_cq_timeout_msec);
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
......
......@@ -62,6 +62,22 @@ class ServerBuilder {
public:
ServerBuilder();
struct SyncServerSettings {
// Number of server completion queues to create to listen to incoming RPCs.
int num_cqs;
// Minimum number of threads per completion queue that should be listening
// to incoming RPCs.
int min_pollers;
// Maximum number of threads per completion queue that can be listening to
// incoming RPCs.
int max_pollers;
// The timeout for server completion queue's AsyncNext call.
int cq_timeout_msec;
};
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the \a Server instance returned
/// by \a BuildAndStart().
......@@ -115,6 +131,9 @@ class ServerBuilder {
ServerBuilder& SetOption(std::unique_ptr<ServerBuilderOption> option);
/// Note: Only useful if this is a Synchronous server.
void SetSyncServerSettings(SyncServerSettings settings);
/// Tries to bind \a server to the given \a addr.
///
/// It can be invoked multiple times.
......@@ -164,18 +183,20 @@ class ServerBuilder {
private:
friend class ::grpc::testing::ServerBuilderPluginTest;
// TODO (sreek) Make these configurable
// The default number of minimum and maximum number of polling threads needed
// per completion queue. These are only used in case of Sync server
const int kDefaultMinPollers = 1;
const int kDefaultMaxPollers = -1; // Unlimited
struct Port {
grpc::string addr;
std::shared_ptr<ServerCredentials> creds;
int* selected_port;
};
// Sync server settings. If this is not set via SetSyncServerSettings(), the
// following default values are used:
// sync_server_settings_.num_cqs = Number of CPUs
// sync_server_settings_.min_pollers = 1
// sync_server_settings_.max_pollers = INT_MAX
// sync_server_settings_.cq_timeout_msec = 1000
struct SyncServerSettings sync_server_settings_;
typedef std::unique_ptr<grpc::string> HostString;
struct NamedService {
explicit NamedService(Service* s) : service(s) {}
......
......@@ -62,6 +62,7 @@ ServerBuilder::ServerBuilder()
auto& factory = *it;
plugins_.emplace_back(factory());
}
// all compression algorithms enabled by default.
enabled_compression_algorithms_bitset_ =
(1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
......@@ -69,6 +70,17 @@ ServerBuilder::ServerBuilder()
sizeof(maybe_default_compression_level_));
memset(&maybe_default_compression_algorithm_, 0,
sizeof(maybe_default_compression_algorithm_));
// Sync server setting defaults
sync_server_settings_.min_pollers = 1;
sync_server_settings_.max_pollers = INT_MAX;
int num_cpus = gpr_cpu_num_cores();
num_cpus = GPR_MAX(num_cpus, 4);
sync_server_settings_.num_cqs = num_cpus;
sync_server_settings_.cq_timeout_msec = 1000;
}
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
......@@ -131,6 +143,10 @@ ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm(
return *this;
}
void ServerBuilder:: SetSyncServerSettings(SyncServerSettings settings) {
sync_server_settings_ = settings; // copy the settings
}
ServerBuilder& ServerBuilder::AddListeningPort(
const grpc::string& addr, std::shared_ptr<ServerCredentials> creds,
int* selected_port) {
......@@ -200,23 +216,17 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
if (has_sync_methods) {
// If the server has synchronous methods, it will need completion queues to
// handle those methods. Create one cq per core (or create 4 if number of
// cores is less than 4 or unavailable)
//
// TODO (sreek) - The default number 4 is just a guess. Check if a lower or
// higher number makes sense
int num_cqs = gpr_cpu_num_cores();
num_cqs = GPR_MAX(num_cqs, 4);
for (int i = 0; i < num_cqs; i++) {
// handle those methods.
for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
sync_server_cqs->emplace_back(new ServerCompletionQueue());
}
}
// TODO (sreek) Make the number of pollers configurable
std::unique_ptr<Server> server(
new Server(sync_server_cqs, max_receive_message_size_, &args,
kDefaultMinPollers, kDefaultMaxPollers));
std::unique_ptr<Server> server(new Server(
sync_server_cqs, max_receive_message_size_, &args,
sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
sync_server_settings_.cq_timeout_msec));
ServerInitializer* initializer = server->initializer();
......
......@@ -291,18 +291,17 @@ class Server::SyncRequestManager : public GrpcRpcManager {
public:
SyncRequestManager(Server* server, CompletionQueue* server_cq,
std::shared_ptr<GlobalCallbacks> global_callbacks,
int min_pollers, int max_pollers)
int min_pollers, int max_pollers, int cq_timeout_msec)
: GrpcRpcManager(min_pollers, max_pollers),
server_(server),
server_cq_(server_cq),
cq_timeout_msec_(cq_timeout_msec),
global_callbacks_(global_callbacks) {}
static const int kRpcPollingTimeoutMsec = 3000;
WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE {
*tag = nullptr;
gpr_timespec deadline =
gpr_time_from_millis(kRpcPollingTimeoutMsec, GPR_TIMESPAN);
gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN);
switch (server_cq_->AsyncNext(tag, ok, deadline)) {
case CompletionQueue::TIMEOUT:
......@@ -389,6 +388,7 @@ class Server::SyncRequestManager : public GrpcRpcManager {
private:
Server* server_;
CompletionQueue* server_cq_;
int cq_timeout_msec_;
std::vector<SyncRequest> sync_methods_;
std::unique_ptr<RpcServiceMethod> unknown_method_;
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
......@@ -399,7 +399,7 @@ Server::Server(
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
int max_receive_message_size, ChannelArguments* args, int min_pollers,
int max_pollers)
int max_pollers, int sync_cq_timeout_msec)
: max_receive_message_size_(max_receive_message_size),
sync_server_cqs_(sync_server_cqs),
started_(false),
......@@ -415,8 +415,9 @@ Server::Server(
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
sync_req_mgrs_.emplace_back(new SyncRequestManager(
this, (*it).get(), global_callbacks_, min_pollers, max_pollers));
sync_req_mgrs_.emplace_back(
new SyncRequestManager(this, (*it).get(), global_callbacks_,
min_pollers, max_pollers, sync_cq_timeout_msec));
}
grpc_channel_args channel_args;
......@@ -606,7 +607,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
// and we didn't remove the tag from the queue yet)
while(shutdown_cq.Next(&tag, &ok)) {
while (shutdown_cq.Next(&tag, &ok)) {
// Nothing to be done here
}
......
......@@ -226,6 +226,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
kMaxMessageSize_(8192),
special_service_("special") {
GetParam().Log();
sync_server_settings_.max_pollers = INT_MAX;
sync_server_settings_.min_pollers = 1;
sync_server_settings_.cq_timeout_msec = 10;
sync_server_settings_.num_cqs = 4;
}
void TearDown() GRPC_OVERRIDE {
......@@ -250,6 +255,9 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
builder.SetMaxMessageSize(
kMaxMessageSize_); // For testing max message size.
builder.RegisterService(&dup_pkg_service_);
builder.SetSyncServerSettings(sync_server_settings_);
server_ = builder.BuildAndStart();
is_server_started_ = true;
}
......@@ -279,6 +287,8 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
ServerBuilder builder;
builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
builder.RegisterService(proxy_service_.get());
builder.SetSyncServerSettings(sync_server_settings_);
proxy_server_ = builder.BuildAndStart();
channel_ = CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
......@@ -299,6 +309,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
TestServiceImpl special_service_;
TestServiceImplDupPkg dup_pkg_service_;
grpc::string user_agent_prefix_;
ServerBuilder::SyncServerSettings sync_server_settings_;
};
static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment