From c4165776366a7bccd2a8571356e409a721093a97 Mon Sep 17 00:00:00 2001 From: Craig Tiller <ctiller@google.com> Date: Wed, 11 Feb 2015 10:51:04 -0800 Subject: [PATCH] Server progress --- Makefile | 6 +- build.json | 10 +- include/grpc++/completion_queue.h | 3 + include/grpc++/impl/call.h | 1 + include/grpc++/server.h | 19 +-- include/grpc++/server_context.h | 4 + src/cpp/server/server.cc | 125 ++++++++++++++++-- ...rver_context_impl.cc => server_context.cc} | 2 +- src/cpp/server/server_context_impl.h | 61 --------- 9 files changed, 133 insertions(+), 98 deletions(-) rename src/cpp/server/{server_context_impl.cc => server_context.cc} (97%) delete mode 100644 src/cpp/server/server_context_impl.h diff --git a/Makefile b/Makefile index 622181b15b..ea0ce66fbe 100644 --- a/Makefile +++ b/Makefile @@ -2645,7 +2645,7 @@ LIBGRPC++_SRC = \ src/cpp/proto/proto_utils.cc \ src/cpp/server/server.cc \ src/cpp/server/server_builder.cc \ - src/cpp/server/server_context_impl.cc \ + src/cpp/server/server_context.cc \ src/cpp/server/server_credentials.cc \ src/cpp/server/thread_pool.cc \ src/cpp/util/status.cc \ @@ -2702,7 +2702,7 @@ src/cpp/common/rpc_method.cc: $(OPENSSL_DEP) src/cpp/proto/proto_utils.cc: $(OPENSSL_DEP) src/cpp/server/server.cc: $(OPENSSL_DEP) src/cpp/server/server_builder.cc: $(OPENSSL_DEP) -src/cpp/server/server_context_impl.cc: $(OPENSSL_DEP) +src/cpp/server/server_context.cc: $(OPENSSL_DEP) src/cpp/server/server_credentials.cc: $(OPENSSL_DEP) src/cpp/server/thread_pool.cc: $(OPENSSL_DEP) src/cpp/util/status.cc: $(OPENSSL_DEP) @@ -2760,7 +2760,7 @@ objs/$(CONFIG)/src/cpp/common/rpc_method.o: objs/$(CONFIG)/src/cpp/proto/proto_utils.o: objs/$(CONFIG)/src/cpp/server/server.o: objs/$(CONFIG)/src/cpp/server/server_builder.o: -objs/$(CONFIG)/src/cpp/server/server_context_impl.o: +objs/$(CONFIG)/src/cpp/server/server_context.o: objs/$(CONFIG)/src/cpp/server/server_credentials.o: objs/$(CONFIG)/src/cpp/server/thread_pool.o: objs/$(CONFIG)/src/cpp/util/status.o: diff --git a/build.json b/build.json index e6993acd6e..77a737031a 100644 --- a/build.json +++ b/build.json @@ -428,7 +428,7 @@ "src/cpp/proto/proto_utils.cc", "src/cpp/server/server.cc", "src/cpp/server/server_builder.cc", - "src/cpp/server/server_context_impl.cc", + "src/cpp/server/server_context.cc", "src/cpp/server/server_credentials.cc", "src/cpp/server/thread_pool.cc", "src/cpp/util/status.cc", @@ -1621,7 +1621,6 @@ { "name": "qps_client", "build": "test", - "run": false, "language": "c++", "src": [ "test/cpp/qps/qpstest.proto", @@ -1634,12 +1633,12 @@ "grpc", "gpr_test_util", "gpr" - ] + ], + "run": false }, { "name": "qps_server", "build": "test", - "run": false, "language": "c++", "src": [ "test/cpp/qps/qpstest.proto", @@ -1652,7 +1651,8 @@ "grpc", "gpr_test_util", "gpr" - ] + ], + "run": false }, { "name": "ruby_plugin", diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index c976bd5b45..7f0677b4e5 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -54,6 +54,7 @@ template <class R, class W> class ServerReaderWriter; class CompletionQueue; +class Server; class CompletionQueueTag { public: @@ -67,6 +68,7 @@ class CompletionQueueTag { class CompletionQueue { public: CompletionQueue(); + explicit CompletionQueue(grpc_completion_queue *take); ~CompletionQueue(); // Blocking read from queue. @@ -87,6 +89,7 @@ class CompletionQueue { template <class R> friend class ::grpc::ServerReader; template <class W> friend class ::grpc::ServerWriter; template <class R, class W> friend class ::grpc::ServerReaderWriter; + friend class ::grpc::Server; friend Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const google::protobuf::Message &request, diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index edc6555b0c..11e193eec1 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -67,6 +67,7 @@ class CallOpBuffer final : public CompletionQueueTag { void AddRecvMessage(google::protobuf::Message *message); void AddClientSendClose(); void AddClientRecvStatus(Status *status); + void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata, const Status& status); // INTERNAL API: diff --git a/include/grpc++/server.h b/include/grpc++/server.h index eefd4457f9..b02c4130d9 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -69,24 +69,7 @@ class Server { private: friend class ServerBuilder; - class MethodRequestData { - public: - MethodRequestData(RpcServiceMethod* method, void* tag) : method_(method), tag_(tag) {} - static MethodRequestData *Wait(CompletionQueue *cq); - - void Request(CompletionQueue* cq); - - class CallData { - public: - explicit CallData(MethodRequestData *mrd); - - void Run(); - }; - - private: - RpcServiceMethod *const method_; - void *const tag_; - }; + class MethodRequestData; // ServerBuilder use only Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ServerCredentials* creds); diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 4af9fd6aaa..a58e63aff2 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -39,11 +39,15 @@ #include "config.h" +struct grpc_metadata; +struct gpr_timespec; + namespace grpc { // Interface of server side rpc context. class ServerContext { public: + ServerContext(gpr_timespec deadline, grpc_metadata *metadata, size_t metadata_count); virtual ~ServerContext() {} std::chrono::system_clock::time_point absolute_deadline(); diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index f5bbfdc6f7..02fb383394 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -43,9 +43,12 @@ #include <grpc++/server_credentials.h> #include <grpc++/thread_pool_interface.h> +#include "src/cpp/proto/proto_utils.h" + namespace grpc { -Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, ServerCredentials *creds) +Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, + ServerCredentials *creds) : started_(false), shutdown_(false), num_running_cb_(0), @@ -53,8 +56,7 @@ Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, ServerC thread_pool_owned_(thread_pool_owned), secure_(creds != nullptr) { if (creds) { - server_ = - grpc_secure_server_create(creds->GetRawCreds(), nullptr, nullptr); + server_ = grpc_secure_server_create(creds->GetRawCreds(), nullptr, nullptr); } else { server_ = grpc_server_create(nullptr, nullptr); } @@ -87,7 +89,8 @@ bool Server::RegisterService(RpcService *service) { RpcServiceMethod *method = service->GetMethod(i); void *tag = grpc_server_register_method(server_, method->name(), nullptr); if (!tag) { - gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); + gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", + method->name()); return false; } methods_.emplace_back(method, tag); @@ -104,6 +107,105 @@ int Server::AddPort(const grpc::string &addr) { } } +class Server::MethodRequestData final : public CompletionQueueTag { + public: + MethodRequestData(RpcServiceMethod *method, void *tag) + : method_(method), + tag_(tag), + has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || + method->method_type() == + RpcMethod::SERVER_STREAMING), + has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC || + method->method_type() == + RpcMethod::CLIENT_STREAMING) { + grpc_metadata_array_init(&request_metadata_); + } + + static MethodRequestData *Wait(CompletionQueue *cq, bool *ok) { + void *tag; + if (!cq->Next(&tag, ok)) { + return nullptr; + } + auto *mrd = static_cast<MethodRequestData *>(tag); + GPR_ASSERT(mrd->in_flight_); + return mrd; + } + + void Request(grpc_server *server, CompletionQueue *cq) { + GPR_ASSERT(!in_flight_); + in_flight_ = true; + cq_ = grpc_completion_queue_create(); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_registered_call( + server, tag_, &call_, &deadline_, &request_metadata_, + has_request_payload_ ? &request_payload_ : nullptr, cq->cq(), + cq_, this)); + } + + void FinalizeResult(void *tag, bool *status) override {} + + class CallData { + public: + explicit CallData(MethodRequestData *mrd) + : cq_(mrd->cq_), + call_(mrd->call_, nullptr, &cq_), + ctx_(mrd->deadline_, mrd->request_metadata_.metadata, + mrd->request_metadata_.count), + has_request_payload_(mrd->has_request_payload_), + has_response_payload_(mrd->has_response_payload_), + request_payload_(mrd->request_payload_), + method_(mrd->method_) { + GPR_ASSERT(mrd->in_flight_); + mrd->in_flight_ = false; + mrd->request_metadata_.count = 0; + } + + void Run() { + std::unique_ptr<google::protobuf::Message> req; + std::unique_ptr<google::protobuf::Message> res; + if (has_request_payload_) { + req.reset(method_->AllocateRequestProto()); + if (!DeserializeProto(request_payload_, req.get())) { + abort(); // for now + } + } + if (has_response_payload_) { + req.reset(method_->AllocateResponseProto()); + } + auto status = method_->handler()->RunHandler( + MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get())); + CallOpBuffer buf; + buf.AddServerSendStatus(nullptr, status); + if (has_response_payload_) { + buf.AddSendMessage(*res); + } + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + } + + private: + CompletionQueue cq_; + Call call_; + ServerContext ctx_; + const bool has_request_payload_; + const bool has_response_payload_; + grpc_byte_buffer *request_payload_; + RpcServiceMethod *const method_; + }; + + private: + RpcServiceMethod *const method_; + void *const tag_; + bool in_flight_ = false; + const bool has_request_payload_; + const bool has_response_payload_; + grpc_call *call_; + gpr_timespec deadline_; + grpc_metadata_array request_metadata_; + grpc_byte_buffer *request_payload_; + grpc_completion_queue *cq_; +}; + bool Server::Start() { GPR_ASSERT(!started_); started_ = true; @@ -111,8 +213,8 @@ bool Server::Start() { // Start processing rpcs. if (cq_sync_) { - for (auto& m : methods_) { - m.Request(cq_sync_.get()); + for (auto &m : methods_) { + m.Request(server_, cq_sync_.get()); } ScheduleCallback(); @@ -146,14 +248,17 @@ void Server::ScheduleCallback() { void Server::RunRpc() { // Wait for one more incoming rpc. - auto* mrd = MethodRequestData::Wait(cq_sync_.get()); + bool ok; + auto *mrd = MethodRequestData::Wait(cq_sync_.get(), &ok); if (mrd) { MethodRequestData::CallData cd(mrd); - mrd->Request(cq_sync_.get()); - ScheduleCallback(); + if (ok) { + mrd->Request(server_, cq_sync_.get()); + ScheduleCallback(); - cd.Run(); + cd.Run(); + } } { diff --git a/src/cpp/server/server_context_impl.cc b/src/cpp/server/server_context.cc similarity index 97% rename from src/cpp/server/server_context_impl.cc rename to src/cpp/server/server_context.cc index 467cc80e05..0edadd8709 100644 --- a/src/cpp/server/server_context_impl.cc +++ b/src/cpp/server/server_context.cc @@ -31,6 +31,6 @@ * */ -#include "src/cpp/server/server_context_impl.h" +#include <grpc++/server_context.h> namespace grpc {} // namespace grpc diff --git a/src/cpp/server/server_context_impl.h b/src/cpp/server/server_context_impl.h deleted file mode 100644 index c6016b7635..0000000000 --- a/src/cpp/server/server_context_impl.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef __GRPCPP_INTERNAL_SERVER_SERVER_CONTEXT_IMPL_H_ -#define __GRPCPP_INTERNAL_SERVER_SERVER_CONTEXT_IMPL_H_ - -#include <grpc++/server_context.h> - -#include <chrono> - -#include <grpc/support/time.h> - -namespace grpc { - -class ServerContextImpl : public ServerContext { - public: - explicit ServerContextImpl(std::chrono::system_clock::time_point deadline) - : absolute_deadline_(deadline) {} - ~ServerContextImpl() {} - - std::chrono::system_clock::time_point absolute_deadline() const { - return absolute_deadline_; - } - - private: - std::chrono::system_clock::time_point absolute_deadline_; -}; - -} // namespace grpc - -#endif // __GRPCPP_INTERNAL_SERVER_SERVER_CONTEXT_IMPL_H_ -- GitLab