Skip to content
Snippets Groups Projects
Commit 3ea9e247 authored by Sree Kuchibhotla's avatar Sree Kuchibhotla
Browse files

some refactoring

parent 86edf015
Branches
Tags
No related merge requests found
...@@ -122,12 +122,13 @@ class Server GRPC_FINAL : public ServerInterface, ...@@ -122,12 +122,13 @@ class Server GRPC_FINAL : public ServerInterface,
/// Server constructors. To be used by \a ServerBuilder only. /// Server constructors. To be used by \a ServerBuilder only.
/// ///
/// \param thread_pool The threadpool instance to use for call processing. /// \param has_sync_methods Does this Server have any synchronous methods.
/// \param thread_pool_owned Does the server own the \a thread_pool instance? /// This information is useful to the server in creating some internal data
/// structures (completion queues / thread pools etc) to handle the incoming
/// RPCs corresponding to those sync methods
/// \param max_message_size Maximum message length that the channel can /// \param max_message_size Maximum message length that the channel can
/// receive. /// receive.
Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, Server(bool has_sync_methods, int max_message_size, ChannelArguments* args);
int max_message_size, ChannelArguments* args);
/// Register a service. This call does not take ownership of the service. /// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance. /// The service must exist for the lifetime of the Server instance.
...@@ -180,7 +181,10 @@ class Server GRPC_FINAL : public ServerInterface, ...@@ -180,7 +181,10 @@ class Server GRPC_FINAL : public ServerInterface,
const int max_message_size_; const int max_message_size_;
// Completion queue. // The following completion queues used ONLY if the server has any services
// with sync methods. The queues are used as notification_cqs to get notified
// of the incoming RPCs
// std::vector<std::unique_ptr<CompletionQueue>> notification_cqs_;
CompletionQueue cq_; CompletionQueue cq_;
// Sever status // Sever status
...@@ -188,9 +192,11 @@ class Server GRPC_FINAL : public ServerInterface, ...@@ -188,9 +192,11 @@ class Server GRPC_FINAL : public ServerInterface,
bool started_; bool started_;
bool shutdown_; bool shutdown_;
bool shutdown_notified_; bool shutdown_notified_;
// TODO (sreek) : Remove num_running_cb_ and callback_cv_;
// The number of threads which are running callbacks. // The number of threads which are running callbacks.
int num_running_cb_; // int num_running_cb_;
grpc::condition_variable callback_cv_; // grpc::condition_variable callback_cv_;
grpc::condition_variable shutdown_cv_; grpc::condition_variable shutdown_cv_;
...@@ -204,10 +210,6 @@ class Server GRPC_FINAL : public ServerInterface, ...@@ -204,10 +210,6 @@ class Server GRPC_FINAL : public ServerInterface,
// Pointer to the c grpc server. // Pointer to the c grpc server.
grpc_server* server_; grpc_server* server_;
ThreadPoolInterface* thread_pool_;
// Whether the thread pool is created and owned by the server.
bool thread_pool_owned_;
std::unique_ptr<ServerInitializer> server_initializer_; std::unique_ptr<ServerInitializer> server_initializer_;
}; };
......
...@@ -276,19 +276,16 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { ...@@ -276,19 +276,16 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
}; };
static internal::GrpcLibraryInitializer g_gli_initializer; static internal::GrpcLibraryInitializer g_gli_initializer;
Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, Server::Server(bool has_sync_methods, int max_message_size,
int max_message_size, ChannelArguments* args) ChannelArguments* args)
: GrpcRpcManager(3, 5, 8), : GrpcRpcManager(3, 5, 8),
max_message_size_(max_message_size), max_message_size_(max_message_size),
started_(false), started_(false),
shutdown_(false), shutdown_(false),
shutdown_notified_(false), shutdown_notified_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>), sync_methods_(new std::list<SyncRequest>),
has_generic_service_(false), has_generic_service_(false),
server_(nullptr), server_(nullptr),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned),
server_initializer_(new ServerInitializer(this)) { server_initializer_(new ServerInitializer(this)) {
g_gli_initializer.summon(); g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
...@@ -297,7 +294,8 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ...@@ -297,7 +294,8 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
grpc_channel_args channel_args; grpc_channel_args channel_args;
args->SetChannelArgs(&channel_args); args->SetChannelArgs(&channel_args);
server_ = grpc_server_create(&channel_args, nullptr); server_ = grpc_server_create(&channel_args, nullptr);
if (thread_pool_ == nullptr) {
if (!has_sync_methods) {
grpc_server_register_non_listening_completion_queue(server_, cq_.cq(), grpc_server_register_non_listening_completion_queue(server_, cq_.cq(),
nullptr); nullptr);
} else { } else {
...@@ -320,9 +318,6 @@ Server::~Server() { ...@@ -320,9 +318,6 @@ Server::~Server() {
bool ok; bool ok;
GPR_ASSERT(!cq_.Next(&got_tag, &ok)); GPR_ASSERT(!cq_.Next(&got_tag, &ok));
grpc_server_destroy(server_); grpc_server_destroy(server_);
if (thread_pool_owned_) {
delete thread_pool_;
}
delete sync_methods_; delete sync_methods_;
} }
...@@ -418,12 +413,14 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { ...@@ -418,12 +413,14 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
// proper constructor implicitly. Construct the object and use push_back. // proper constructor implicitly. Construct the object and use push_back.
sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
} }
for (size_t i = 0; i < num_cqs; i++) { for (size_t i = 0; i < num_cqs; i++) {
if (cqs[i]->IsFrequentlyPolled()) { if (cqs[i]->IsFrequentlyPolled()) {
new UnimplementedAsyncRequest(this, cqs[i]); new UnimplementedAsyncRequest(this, cqs[i]);
} }
} }
} }
// Start processing rpcs. // Start processing rpcs.
if (!sync_methods_->empty()) { if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
...@@ -465,10 +462,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) { ...@@ -465,10 +462,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
} }
lock.lock(); lock.lock();
/* TODO (sreek) - Remove this block */
// Wait for running callbacks to finish. // Wait for running callbacks to finish.
/*
while (num_running_cb_ != 0) { while (num_running_cb_ != 0) {
callback_cv_.wait(lock); callback_cv_.wait(lock);
} }
*/
shutdown_notified_ = true; shutdown_notified_ = true;
shutdown_cv_.notify_all(); shutdown_cv_.notify_all();
......
...@@ -138,31 +138,32 @@ ServerBuilder& ServerBuilder::AddListeningPort( ...@@ -138,31 +138,32 @@ ServerBuilder& ServerBuilder::AddListeningPort(
} }
std::unique_ptr<Server> ServerBuilder::BuildAndStart() { std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<ThreadPoolInterface> thread_pool;
// == Determine if the server has any syncrhonous methods ==
bool has_sync_methods = false; bool has_sync_methods = false;
for (auto it = services_.begin(); it != services_.end(); ++it) { for (auto it = services_.begin(); it != services_.end(); ++it) {
if ((*it)->service->has_synchronous_methods()) { if ((*it)->service->has_synchronous_methods()) {
if (!thread_pool) {
thread_pool.reset(CreateDefaultThreadPool());
has_sync_methods = true; has_sync_methods = true;
break; break;
} }
} }
}
ChannelArguments args; if (!has_sync_methods) {
for (auto option = options_.begin(); option != options_.end(); ++option) {
(*option)->UpdateArguments(&args);
(*option)->UpdatePlugins(&plugins_);
}
if (!thread_pool) {
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
if ((*plugin)->has_sync_methods()) { if ((*plugin)->has_sync_methods()) {
thread_pool.reset(CreateDefaultThreadPool());
has_sync_methods = true; has_sync_methods = true;
break; break;
} }
} }
} }
// == Channel args ==
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
(*option)->UpdateArguments(&args);
(*option)->UpdatePlugins(&plugins_);
}
if (max_message_size_ > 0) { if (max_message_size_ > 0) {
args.SetInt(GRPC_ARG_MAX_MESSAGE_LENGTH, max_message_size_); args.SetInt(GRPC_ARG_MAX_MESSAGE_LENGTH, max_message_size_);
} }
...@@ -176,8 +177,10 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { ...@@ -176,8 +177,10 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
maybe_default_compression_algorithm_.algorithm); maybe_default_compression_algorithm_.algorithm);
} }
std::unique_ptr<Server> server( std::unique_ptr<Server> server(
new Server(thread_pool.release(), true, max_message_size_, &args)); new Server(has_sync_methods, max_message_size_, &args));
ServerInitializer* initializer = server->initializer(); ServerInitializer* initializer = server->initializer();
// If the server has atleast one sync methods, we know that this is a Sync // If the server has atleast one sync methods, we know that this is a Sync
...@@ -212,9 +215,11 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { ...@@ -212,9 +215,11 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr; return nullptr;
} }
} }
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
(*plugin)->InitServer(initializer); (*plugin)->InitServer(initializer);
} }
if (generic_service_) { if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_); server->RegisterAsyncGenericService(generic_service_);
} else { } else {
...@@ -227,6 +232,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { ...@@ -227,6 +232,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
} }
} }
} }
for (auto port = ports_.begin(); port != ports_.end(); port++) { for (auto port = ports_.begin(); port != ports_.end(); port++) {
int r = server->AddListeningPort(port->addr, port->creds.get()); int r = server->AddListeningPort(port->addr, port->creds.get());
if (!r) return nullptr; if (!r) return nullptr;
...@@ -234,13 +240,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { ...@@ -234,13 +240,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
*port->selected_port = r; *port->selected_port = r;
} }
} }
auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0]; auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
if (!server->Start(cqs_data, cqs_.size())) { if (!server->Start(cqs_data, cqs_.size())) {
return nullptr; return nullptr;
} }
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
(*plugin)->Finish(initializer); (*plugin)->Finish(initializer);
} }
return server; return server;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment