diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index bc1db4c12c0a40e27d69e2a0bd94866c99fe4d7d..35338a413e25d95ce8a2544c731173df332cd9d1 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -541,8 +541,7 @@ class CallOpSet : public CallOpSetInterface, template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>, class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>, class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>> -class SneakyCallOpSet GRPC_FINAL - : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> { +class SneakyCallOpSet : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> { public: bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base; diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index 925801e1ceea8884f2632dcf858205e0b5866d59..c02ebec19ebc3e3b1d3bc09208e4fdc523d671e9 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -211,13 +211,19 @@ class BidiStreamingHandler : public MethodHandler { // Handle unknown method by returning UNIMPLEMENTED error. class UnknownMethodHandler : public MethodHandler { public: - void RunHandler(const HandlerParameter& param) GRPC_FINAL { + template <class T> + static void FillOps(ServerContext* context, T* ops) { Status status(StatusCode::UNIMPLEMENTED, ""); - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; - if (!param.server_context->sent_initial_metadata_) { - ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (!context->sent_initial_metadata_) { + ops->SendInitialMetadata(context->initial_metadata_); + context->sent_initial_metadata_ = true; } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + ops->ServerSendStatus(context->trailing_metadata_, status); + } + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; + FillOps(param.server_context, &ops); param.call->PerformOps(&ops); param.call->cq()->Pluck(&ops); } diff --git a/include/grpc++/server.h b/include/grpc++/server.h index a2bc097c7f759e3260da7d980314c5fb0629bf48..3cff07fb80c82fd9b2218ca08e1c685c99ec37b2 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -98,7 +98,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { // Add a listening port. Can be called multiple times. int AddListeningPort(const grpc::string& addr, ServerCredentials* creds); // Start the server. - bool Start(); + bool Start(ServerCompletionQueue** cqs, size_t num_cqs); void HandleQueueClosed(); void RunRpc(); @@ -112,7 +112,8 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { public: BaseAsyncRequest(Server* server, ServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, void* tag); + CompletionQueue* call_cq, void* tag, + bool delete_on_finalize); virtual ~BaseAsyncRequest(); bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; @@ -123,6 +124,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { ServerAsyncStreamingInterface* const stream_; CompletionQueue* const call_cq_; void* const tag_; + const bool delete_on_finalize_; grpc_call* call_; grpc_metadata_array initial_metadata_array_; }; @@ -184,12 +186,13 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { Message* const request_; }; - class GenericAsyncRequest GRPC_FINAL : public BaseAsyncRequest { + class GenericAsyncRequest : public BaseAsyncRequest { public: GenericAsyncRequest(Server* server, GenericServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag); + ServerCompletionQueue* notification_cq, void* tag, + bool delete_on_finalize); bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; @@ -197,6 +200,10 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { grpc_call_details call_details_; }; + class UnimplementedAsyncRequestContext; + class UnimplementedAsyncRequest; + class UnimplementedAsyncResponse; + template <class Message> void RequestAsyncCall(void* registered_method, ServerContext* context, ServerAsyncStreamingInterface* stream, @@ -221,7 +228,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { ServerCompletionQueue* notification_cq, void* tag) { new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, - tag); + tag, true); } const int max_message_size_; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 906daf13709417c80af3c01e0ca4627af40ab453..8666252d51e80ad96fbe6e0b477bcb5ac455332d 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -101,8 +101,8 @@ class ServerBuilder { void SetThreadPool(ThreadPoolInterface* thread_pool); // Add a completion queue for handling asynchronous services - // Caller is required to keep this completion queue live until calling - // BuildAndStart() + // Caller is required to keep this completion queue live until + // the server is destroyed. std::unique_ptr<ServerCompletionQueue> AddCompletionQueue(); // Return a running server which is ready for processing rpcs. diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 45dafcd282266ff54d4acd8867f45424f0332111..4bffaffb407f7d27763eca4e5a0412190dd8ccaf 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -761,6 +761,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, } private: + friend class ::grpc::Server; + void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } Call call_; diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index c474e4dbf1cdf6f5fb80189c2e14a45181780cb6..337596cb741e33cd1a449f3888626e9b0900d9ce 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -74,10 +74,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset); grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will not be released by grpc_pollset_work AFTER worker has been destroyed. - Returns true if some work has been done, and false if the deadline - expired. */ -int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline); + Tries not to block past deadline. */ +void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec now, gpr_timespec deadline); /* Break one polling thread out of polling work for this pollset. If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers. diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 5ea9dd210106307b6a8d401b76f603fec2c06db9..fe66ebed77914c35113c75da8c7022630a86735e 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -181,7 +181,7 @@ static void multipoll_with_epoll_pollset_maybe_work( pfds[1].events = POLLIN; pfds[1].revents = 0; - poll_rv = poll(pfds, 2, timeout_ms); + poll_rv = grpc_poll_function(pfds, 2, timeout_ms); if (poll_rv < 0) { if (errno != EINTR) { diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 001fcecf761037e31163b7a36c5383614bee4803..30ee6e24db4132a23e11b13c3d6674df399b1c04 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -144,7 +144,7 @@ static void multipoll_with_poll_pollset_maybe_work( POLLOUT, &watchers[i]); } - r = poll(pfds, pfd_count, timeout); + r = grpc_poll_function(pfds, pfd_count, timeout); for (i = 1; i < pfd_count; i++) { grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN, diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index a01f9ff7278b79bf04e9cb8a4a28fa3f3781c2c7..6bd1b61f242da62e67a2a2aa2a3702d2f3c2b871 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -38,7 +38,6 @@ #include "src/core/iomgr/pollset_posix.h" #include <errno.h> -#include <poll.h> #include <stdlib.h> #include <string.h> #include <unistd.h> @@ -57,6 +56,8 @@ GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); +grpc_poll_function_type grpc_poll_function = poll; + static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next; worker->next->prev = worker->prev; @@ -89,6 +90,7 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { + /* pollset->mu already held */ if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { for (specific_worker = p->root_worker.next; @@ -168,14 +170,10 @@ static void finish_shutdown(grpc_pollset *pollset) { pollset->shutdown_done_cb(pollset->shutdown_done_arg); } -int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline) { +void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec now, gpr_timespec deadline) { /* pollset->mu already held */ - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); int added_worker = 0; - if (gpr_time_cmp(now, deadline) > 0) { - return 0; - } /* this must happen before we (potentially) drop pollset->mu */ worker->next = worker->prev = NULL; /* TODO(ctiller): pool these */ @@ -217,7 +215,6 @@ done: gpr_mu_lock(&pollset->mu); } } - return 1; } void grpc_pollset_shutdown(grpc_pollset *pollset, @@ -456,7 +453,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, /* poll fd count (argument 2) is shortened by one if we have no events to poll on - such that it only includes the kicker */ - r = poll(pfd, nfds, timeout); + r = grpc_poll_function(pfd, nfds, timeout); GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); if (fd) { diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index a3ea353de6edb131a380c1bc60b2c15c30e0e5d1..69bd9cca8ce02899d1c4d53ecd58f79d96298990 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H +#include <poll.h> + #include <grpc/support/sync.h> #include "src/core/iomgr/wakeup_fd_posix.h" @@ -118,4 +120,8 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds, * be locked) */ int grpc_pollset_has_workers(grpc_pollset *pollset); +/* override to allow tests to hook poll() usage */ +typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); +extern grpc_poll_function_type grpc_poll_function; + #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 8710395ab3d5334e9dedfae54e2a27e358ab191f..07522c8a0c3f0a8cb6760c6bc52e86b6f84391f3 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -99,14 +99,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); } -int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline) { - gpr_timespec now; +void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec now, gpr_timespec deadline) { int added_worker = 0; - now = gpr_now(GPR_CLOCK_MONOTONIC); - if (gpr_time_cmp(now, deadline) > 0) { - return 0 /* GPR_FALSE */; - } worker->next = worker->prev = NULL; gpr_cv_init(&worker->cv); if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) { @@ -127,7 +122,6 @@ done: if (added_worker) { remove_worker(pollset, worker); } - return 1 /* GPR_TRUE */; } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index d6092ece320bf3630896b9a1f4e38ecc99078503..3631de867af8e4d2f38244f7940625711fbbda0c 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -115,7 +115,7 @@ static int is_stack_running_on_compute_engine(void) { gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset)); while (!detector.is_done) { grpc_pollset_worker worker; - grpc_pollset_work(&detector.pollset, &worker, + grpc_pollset_work(&detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 77443a7ae8b382c51092dc911771355c1659c773..b58115a93f8aac1091d2aaa2bc99a1eb3d80f77a 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -170,6 +170,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_pollset_worker worker; + int first_loop = 1; + gpr_timespec now; + GPR_ASSERT(!reserved); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); @@ -196,12 +199,15 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ret.type = GRPC_QUEUE_SHUTDOWN; break; } - if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) { + now = gpr_now(GPR_CLOCK_MONOTONIC); + if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; } + first_loop = 0; + grpc_pollset_work(&cc->pollset, &worker, now, deadline); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); @@ -239,6 +245,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker worker; + gpr_timespec now; + int first_loop = 1; + GPR_ASSERT(!reserved); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); @@ -281,13 +290,16 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ret.type = GRPC_QUEUE_TIMEOUT; break; } - if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) { + now = gpr_now(GPR_CLOCK_MONOTONIC); + if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { del_plucker(cc, tag, &worker); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; } + first_loop = 0; + grpc_pollset_work(&cc->pollset, &worker, now, deadline); del_plucker(cc, tag, &worker); } done: diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index e039c07374d89759c76fd48701cb4f5310a3b0f5..1fe8f82dd0c436a8ebe212963d7f546f4e081e3a 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -50,6 +50,52 @@ namespace grpc { +class Server::UnimplementedAsyncRequestContext { + protected: + UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {} + + GenericServerContext server_context_; + GenericServerAsyncReaderWriter generic_stream_; +}; + +class Server::UnimplementedAsyncRequest GRPC_FINAL + : public UnimplementedAsyncRequestContext, + public GenericAsyncRequest { + public: + UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq) + : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq, + NULL, false), + server_(server), + cq_(cq) {} + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + + ServerContext* context() { return &server_context_; } + GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } + + private: + Server* const server_; + ServerCompletionQueue* const cq_; +}; + +typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> + UnimplementedAsyncResponseOp; +class Server::UnimplementedAsyncResponse GRPC_FINAL + : public UnimplementedAsyncResponseOp { + public: + UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); + ~UnimplementedAsyncResponse() { delete request_; } + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status); + delete this; + return r; + } + + private: + UnimplementedAsyncRequest* const request_; +}; + class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { @@ -297,18 +343,23 @@ int Server::AddListeningPort(const grpc::string& addr, return creds->AddPortToServer(addr, server_); } -bool Server::Start() { +bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { GPR_ASSERT(!started_); started_ = true; grpc_server_start(server_); if (!has_generic_service_) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); - // Use of emplace_back with just constructor arguments is not accepted here - // by gcc-4.4 because it can't match the anonymous nullptr with a proper - // constructor implicitly. Construct the object and use push_back. - sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); + if (!sync_methods_->empty()) { + unknown_method_.reset(new RpcServiceMethod( + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + // Use of emplace_back with just constructor arguments is not accepted + // here by gcc-4.4 because it can't match the anonymous nullptr with a + // proper constructor implicitly. Construct the object and use push_back. + sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); + } + for (size_t i = 0; i < num_cqs; i++) { + new UnimplementedAsyncRequest(this, cqs[i]); + } } // Start processing rpcs. if (!sync_methods_->empty()) { @@ -370,12 +421,14 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { Server::BaseAsyncRequest::BaseAsyncRequest( Server* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, + bool delete_on_finalize) : server_(server), context_(context), stream_(stream), call_cq_(call_cq), tag_(tag), + delete_on_finalize_(delete_on_finalize), call_(nullptr) { memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); } @@ -402,14 +455,16 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { // just the pointers inside call are copied here stream_->BindCall(&call); *tag = tag_; - delete this; + if (delete_on_finalize_) { + delete this; + } return true; } Server::RegisteredAsyncRequest::RegisteredAsyncRequest( Server* server, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) - : BaseAsyncRequest(server, context, stream, call_cq, tag) {} + : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} void Server::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, @@ -423,8 +478,9 @@ void Server::RegisteredAsyncRequest::IssueRequest( Server::GenericAsyncRequest::GenericAsyncRequest( Server* server, GenericServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) - : BaseAsyncRequest(server, context, stream, call_cq, tag) { + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : BaseAsyncRequest(server, context, stream, call_cq, tag, + delete_on_finalize) { grpc_call_details_init(&call_details_); GPR_ASSERT(notification_cq); GPR_ASSERT(call_cq); @@ -445,6 +501,25 @@ bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) { return BaseAsyncRequest::FinalizeResult(tag, status); } +bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, + bool* status) { + if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) { + new UnimplementedAsyncRequest(server_, cq_); + new UnimplementedAsyncResponse(this); + } else { + delete this; + } + return false; +} + +Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( + UnimplementedAsyncRequest* request) + : request_(request) { + Status status(StatusCode::UNIMPLEMENTED, ""); + UnknownMethodHandler::FillOps(request_->context(), this); + request_->stream()->call_.PerformOps(this); +} + void Server::ScheduleCallback() { { grpc::unique_lock<grpc::mutex> lock(mu_); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 0b11d8617362863f21aa327440588a15f5e06d6d..a13269abbfab6d99d6ea7362b1c7281bd52b6e10 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -103,12 +103,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { thread_pool_ = CreateDefaultThreadPool(); thread_pool_owned = true; } - // Async services only, create a thread pool to handle requests to unknown - // services. - if (!thread_pool_ && !generic_service_ && !async_services_.empty()) { - thread_pool_ = new FixedSizeThreadPool(1); - thread_pool_owned = true; - } std::unique_ptr<Server> server( new Server(thread_pool_, thread_pool_owned, max_message_size_)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { @@ -138,7 +132,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { *port->selected_port = r; } } - if (!server->Start()) { + if (!server->Start(&cqs_[0], cqs_.size())) { return nullptr; } return server; diff --git a/test/core/httpcli/httpcli_test.c b/test/core/httpcli/httpcli_test.c index 8dddfbee9868cd4e295c8bce2fe1d8efdf81377c..42b2661c0a37a35e06364695bbb5eae1fe9b0e88 100644 --- a/test/core/httpcli/httpcli_test.c +++ b/test/core/httpcli/httpcli_test.c @@ -88,7 +88,8 @@ static void test_get(int use_ssl, int port) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!g_done) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, n_seconds_time(20)); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + n_seconds_time(20)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_free(host); @@ -114,7 +115,8 @@ static void test_post(int use_ssl, int port) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!g_done) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, n_seconds_time(20)); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + n_seconds_time(20)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_free(host); diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index 8186c96da1b190e80c0cfbb97d503afe8c946948..6ef8e9ca3bc9420bf6de12f6df7a054036a1d1db 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -256,7 +256,8 @@ static void read_and_write_test(grpc_endpoint_test_config config, while (!state.read_done || !state.write_done) { grpc_pollset_worker worker; GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); - grpc_pollset_work(g_pollset, &worker, deadline); + grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + deadline); } gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); @@ -353,7 +354,8 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config, while (!write_st.done) { grpc_pollset_worker worker; GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0); - grpc_pollset_work(g_pollset, &worker, deadline); + grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + deadline); } gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); grpc_endpoint_destroy(write_st.ep); @@ -361,7 +363,8 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config, while (!read_st.done) { grpc_pollset_worker worker; GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0); - grpc_pollset_work(g_pollset, &worker, deadline); + grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + deadline); } gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); gpr_free(slices); diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index adcbcafdbbaa3794016f375962da39eb7a98f736..8bba87d61fc68aa9a5cb072007de89c77c5d21bf 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -250,7 +250,8 @@ static void server_wait_and_shutdown(server *sv) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!sv->done) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } @@ -358,7 +359,8 @@ static void client_wait_and_shutdown(client *cl) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!cl->done) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } @@ -448,7 +450,8 @@ static void test_grpc_fd_change(void) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (a.cb_that_ran == NULL) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)); } GPR_ASSERT(a.cb_that_ran == first_read_callback); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -467,7 +470,8 @@ static void test_grpc_fd_change(void) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (b.cb_that_ran == NULL) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)); } /* Except now we verify that second_read_callback ran instead */ GPR_ASSERT(b.cb_that_ran == second_read_callback); diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 07bbe1f40251e3e503c09ad19c108fe224b69d4b..dea0b33b8e92ad899780ce580511c653f15cf6fa 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -112,7 +112,8 @@ void test_succeeds(void) { while (g_connections_complete == connections_complete_before) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -142,7 +143,8 @@ void test_fails(void) { /* wait for the connection callback to finish */ while (g_connections_complete == connections_complete_before) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, test_deadline()); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + test_deadline()); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -211,7 +213,8 @@ void test_times_out(void) { GPR_ASSERT(g_connections_complete == connections_complete_before + is_after_deadline); } - grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 17a85ceaec7908f6f5b2a7577e0be5d284e7bfa7..6ad832231f5933ec3c64f01c6d2cf57fc03ce756 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -187,7 +187,8 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, deadline); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + deadline); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -224,7 +225,8 @@ static void large_read_test(ssize_t slice_size) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, deadline); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + deadline); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -285,7 +287,8 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { for (;;) { grpc_pollset_worker worker; gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); - grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); do { bytes_read = @@ -365,7 +368,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { if (state.write_done) { break; } - grpc_pollset_work(&g_pollset, &worker, deadline); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + deadline); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } @@ -422,7 +426,8 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { if (state.write_done) { break; } - grpc_pollset_work(&g_pollset, &worker, deadline); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + deadline); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); break; diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index b82d7c08b1da4d5653ca259534cc7ba3b836c997..29a20cba8e5d657b76f2db1d69c007db5678af55 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -137,7 +137,8 @@ static void test_connect(int n) { while (g_nconnects == nconnects_before && gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, deadline); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + deadline); } gpr_log(GPR_DEBUG, "wait done"); diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index 5a5f99fb94b3f48907500bb6671dde6aafd35a82..471d5b50c7c67b9de31b138ea3b7cb714a20cc2f 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -146,7 +146,8 @@ static void test_receive(int number_of_clients) { while (g_number_of_reads == number_of_reads_before && gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, deadline); + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + deadline); } GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1); close(clifd); diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c index 990855ac6ae80030ded756ad69ccf6f894df12f0..7df6fade6b2b04e173d1aa20496e0e0fee651fe3 100644 --- a/test/core/security/oauth2_utils.c +++ b/test/core/security/oauth2_utils.c @@ -85,7 +85,7 @@ char *grpc_test_fetch_oauth2_token_with_credentials(grpc_credentials *creds) { gpr_mu_lock(GRPC_POLLSET_MU(&request.pollset)); while (!request.is_done) { grpc_pollset_worker worker; - grpc_pollset_work(&request.pollset, &worker, + grpc_pollset_work(&request.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(GRPC_POLLSET_MU(&request.pollset)); diff --git a/test/core/security/print_google_default_creds_token.c b/test/core/security/print_google_default_creds_token.c index b4323ab200f20cba06f7f21596c75d3001d2feea..753221cada77b66d90e787f8e44f58e1639700b8 100644 --- a/test/core/security/print_google_default_creds_token.c +++ b/test/core/security/print_google_default_creds_token.c @@ -96,8 +96,8 @@ int main(int argc, char **argv) { gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset)); while (!sync.is_done) { grpc_pollset_worker worker; - grpc_pollset_work(&sync.pollset, &worker, - gpr_inf_future(GPR_CLOCK_REALTIME)); + grpc_pollset_work(&sync.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset)); diff --git a/test/core/security/verify_jwt.c b/test/core/security/verify_jwt.c index 5ebde5fbb4fb8cdbd4284098bd4245b373635650..f4432667ee23c0e51a5b14801cb028d25ffa57b3 100644 --- a/test/core/security/verify_jwt.c +++ b/test/core/security/verify_jwt.c @@ -111,7 +111,7 @@ int main(int argc, char **argv) { gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset)); while (!sync.is_done) { grpc_pollset_worker worker; - grpc_pollset_work(&sync.pollset, &worker, + grpc_pollset_work(&sync.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset)); diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c index cec0eebd33f54c50e7693f3794a84ff41dfff791..836e62a5412263c22b4878b89fef6dde52f13db5 100644 --- a/test/core/util/port_posix.c +++ b/test/core/util/port_posix.c @@ -178,7 +178,7 @@ static int pick_port_using_server(char *server) { gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); while (pr.port == -1) { grpc_pollset_worker worker; - grpc_pollset_work(&pr.pollset, &worker, + grpc_pollset_work(&pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); } gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c index 2a2113338bdb75e565d93d425faa3904ebceb4d5..a06cb50b3a370f2a66bb3fa8682d17df6c12b23a 100644 --- a/test/core/util/reconnect_server.c +++ b/test/core/util/reconnect_server.c @@ -134,7 +134,8 @@ void reconnect_server_poll(reconnect_server *server, int seconds) { gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(seconds, GPR_TIMESPAN)); gpr_mu_lock(GRPC_POLLSET_MU(&server->pollset)); - grpc_pollset_work(&server->pollset, &worker, deadline); + grpc_pollset_work(&server->pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + deadline); gpr_mu_unlock(GRPC_POLLSET_MU(&server->pollset)); } diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index a30c8412167d111025c9225e3ff05a098400817f..9a9cca0ba144018f0ec175a00fd61e7cecc8ace3 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -56,6 +56,10 @@ #include <grpc/support/thd.h> #include <grpc/support/time.h> +#ifdef GPR_POSIX_SOCKET +#include "src/core/iomgr/pollset_posix.h" +#endif + using grpc::cpp::test::util::EchoRequest; using grpc::cpp::test::util::EchoResponse; using std::chrono::system_clock; @@ -67,8 +71,41 @@ namespace { void* tag(int i) { return (void*)(gpr_intptr)i; } -class Verifier { +#ifdef GPR_POSIX_SOCKET +static int assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds, + int timeout) { + GPR_ASSERT(timeout == 0); + return poll(pfds, nfds, timeout); +} + +class PollOverride { + public: + PollOverride(grpc_poll_function_type f) { + prev_ = grpc_poll_function; + grpc_poll_function = f; + } + + ~PollOverride() { grpc_poll_function = prev_; } + + private: + grpc_poll_function_type prev_; +}; + +class PollingCheckRegion : public PollOverride { public: + explicit PollingCheckRegion(bool allow_blocking) + : PollOverride(allow_blocking ? poll : assert_non_blocking_poll) {} +}; +#else +class PollingCheckRegion { + public: + explicit PollingCheckRegion(bool allow_blocking) {} +}; +#endif + +class Verifier : public PollingCheckRegion { + public: + explicit Verifier(bool spin) : PollingCheckRegion(!spin), spin_(spin) {} Verifier& Expect(int i, bool expect_ok) { expectations_[tag(i)] = expect_ok; return *this; @@ -78,7 +115,17 @@ class Verifier { while (!expectations_.empty()) { bool ok; void* got_tag; - EXPECT_TRUE(cq->Next(&got_tag, &ok)); + if (spin_) { + for (;;) { + auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); + if (r == CompletionQueue::TIMEOUT) continue; + if (r == CompletionQueue::GOT_EVENT) break; + gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); + abort(); + } + } else { + EXPECT_TRUE(cq->Next(&got_tag, &ok)); + } auto it = expectations_.find(got_tag); EXPECT_TRUE(it != expectations_.end()); EXPECT_EQ(it->second, ok); @@ -90,14 +137,34 @@ class Verifier { if (expectations_.empty()) { bool ok; void* got_tag; - EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), - CompletionQueue::TIMEOUT); + if (spin_) { + while (std::chrono::system_clock::now() < deadline) { + EXPECT_EQ( + cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)), + CompletionQueue::TIMEOUT); + } + } else { + EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), + CompletionQueue::TIMEOUT); + } } else { while (!expectations_.empty()) { bool ok; void* got_tag; - EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), - CompletionQueue::GOT_EVENT); + if (spin_) { + for (;;) { + GPR_ASSERT(std::chrono::system_clock::now() < deadline); + auto r = + cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); + if (r == CompletionQueue::TIMEOUT) continue; + if (r == CompletionQueue::GOT_EVENT) break; + gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); + abort(); + } + } else { + EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), + CompletionQueue::GOT_EVENT); + } auto it = expectations_.find(got_tag); EXPECT_TRUE(it != expectations_.end()); EXPECT_EQ(it->second, ok); @@ -108,9 +175,10 @@ class Verifier { private: std::map<void*, bool> expectations_; + bool spin_; }; -class AsyncEnd2endTest : public ::testing::Test { +class AsyncEnd2endTest : public ::testing::TestWithParam<bool> { protected: AsyncEnd2endTest() {} @@ -160,15 +228,15 @@ class AsyncEnd2endTest : public ::testing::Test { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier().Expect(2, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier().Expect(3, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier().Expect(4, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -182,18 +250,18 @@ class AsyncEnd2endTest : public ::testing::Test { std::ostringstream server_address_; }; -TEST_F(AsyncEnd2endTest, SimpleRpc) { +TEST_P(AsyncEnd2endTest, SimpleRpc) { ResetStub(); SendRpc(1); } -TEST_F(AsyncEnd2endTest, SequentialRpcs) { +TEST_P(AsyncEnd2endTest, SequentialRpcs) { ResetStub(); SendRpc(10); } // Test a simple RPC using the async version of Next -TEST_F(AsyncEnd2endTest, AsyncNextRpc) { +TEST_P(AsyncEnd2endTest, AsyncNextRpc) { ResetStub(); EchoRequest send_request; @@ -214,30 +282,32 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) { std::chrono::system_clock::now()); std::chrono::system_clock::time_point time_limit( std::chrono::system_clock::now() + std::chrono::seconds(10)); - Verifier().Verify(cq_.get(), time_now); - Verifier().Verify(cq_.get(), time_now); + Verifier(GetParam()).Verify(cq_.get(), time_now); + Verifier(GetParam()).Verify(cq_.get(), time_now); service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier().Expect(2, true).Verify(cq_.get(), time_limit); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier().Expect(3, true).Verify( - cq_.get(), std::chrono::system_clock::time_point::max()); + Verifier(GetParam()) + .Expect(3, true) + .Verify(cq_.get(), std::chrono::system_clock::time_point::max()); response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier().Expect(4, true).Verify( - cq_.get(), std::chrono::system_clock::time_point::max()); + Verifier(GetParam()) + .Expect(4, true) + .Verify(cq_.get(), std::chrono::system_clock::time_point::max()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); } // Two pings and a final pong. -TEST_F(AsyncEnd2endTest, SimpleClientStreaming) { +TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { ResetStub(); EchoRequest send_request; @@ -256,41 +326,41 @@ TEST_F(AsyncEnd2endTest, SimpleClientStreaming) { service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - Verifier().Expect(3, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(4)); - Verifier().Expect(4, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->Write(send_request, tag(5)); - Verifier().Expect(5, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(6)); - Verifier().Expect(6, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_stream->WritesDone(tag(7)); - Verifier().Expect(7, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(8)); - Verifier().Expect(8, false).Verify(cq_.get()); + Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); send_response.set_message(recv_request.message()); srv_stream.Finish(send_response, Status::OK, tag(9)); - Verifier().Expect(9, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(9, true).Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(10)); - Verifier().Expect(10, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); } // One ping, two pongs. -TEST_F(AsyncEnd2endTest, SimpleServerStreaming) { +TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { ResetStub(); EchoRequest send_request; @@ -309,38 +379,38 @@ TEST_F(AsyncEnd2endTest, SimpleServerStreaming) { service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(3)); - Verifier().Expect(3, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); cli_stream->Read(&recv_response, tag(4)); - Verifier().Expect(4, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Write(send_response, tag(5)); - Verifier().Expect(5, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); cli_stream->Read(&recv_response, tag(6)); - Verifier().Expect(6, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); srv_stream.Finish(Status::OK, tag(7)); - Verifier().Expect(7, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); cli_stream->Read(&recv_response, tag(8)); - Verifier().Expect(8, false).Verify(cq_.get()); + Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(9)); - Verifier().Expect(9, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(9, true).Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); } // One ping, one pong. -TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) { +TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { ResetStub(); EchoRequest send_request; @@ -359,40 +429,40 @@ TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) { service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get()); cli_stream->Write(send_request, tag(3)); - Verifier().Expect(3, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(4)); - Verifier().Expect(4, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(5)); - Verifier().Expect(5, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); cli_stream->Read(&recv_response, tag(6)); - Verifier().Expect(6, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); cli_stream->WritesDone(tag(7)); - Verifier().Expect(7, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); srv_stream.Read(&recv_request, tag(8)); - Verifier().Expect(8, false).Verify(cq_.get()); + Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); srv_stream.Finish(Status::OK, tag(9)); - Verifier().Expect(9, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(9, true).Verify(cq_.get()); cli_stream->Finish(&recv_status, tag(10)); - Verifier().Expect(10, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(10, true).Verify(cq_.get()); EXPECT_TRUE(recv_status.ok()); } // Metadata tests -TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { +TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { ResetStub(); EchoRequest send_request; @@ -416,7 +486,7 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier().Expect(2, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); @@ -426,16 +496,16 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier().Expect(3, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier().Expect(4, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); } -TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { +TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { ResetStub(); EchoRequest send_request; @@ -457,15 +527,15 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier().Expect(2, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); srv_ctx.AddInitialMetadata(meta1.first, meta1.second); srv_ctx.AddInitialMetadata(meta2.first, meta2.second); response_writer.SendInitialMetadata(tag(3)); - Verifier().Expect(3, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); response_reader->ReadInitialMetadata(tag(4)); - Verifier().Expect(4, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second); @@ -473,16 +543,16 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(5)); - Verifier().Expect(5, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier().Expect(6, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); } -TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { +TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) { ResetStub(); EchoRequest send_request; @@ -504,20 +574,20 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier().Expect(2, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); response_writer.SendInitialMetadata(tag(3)); - Verifier().Expect(3, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); send_response.set_message(recv_request.message()); srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); srv_ctx.AddTrailingMetadata(meta2.first, meta2.second); response_writer.Finish(send_response, Status::OK, tag(4)); - Verifier().Expect(4, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(5)); - Verifier().Expect(5, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); @@ -526,7 +596,7 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size()); } -TEST_F(AsyncEnd2endTest, MetadataRpc) { +TEST_P(AsyncEnd2endTest, MetadataRpc) { ResetStub(); EchoRequest send_request; @@ -563,7 +633,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier().Expect(2, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); @@ -573,9 +643,9 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { srv_ctx.AddInitialMetadata(meta3.first, meta3.second); srv_ctx.AddInitialMetadata(meta4.first, meta4.second); response_writer.SendInitialMetadata(tag(3)); - Verifier().Expect(3, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); response_reader->ReadInitialMetadata(tag(4)); - Verifier().Expect(4, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second); EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second); @@ -586,10 +656,10 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { srv_ctx.AddTrailingMetadata(meta6.first, meta6.second); response_writer.Finish(send_response, Status::OK, tag(5)); - Verifier().Expect(5, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); response_reader->Finish(&recv_response, &recv_status, tag(6)); - Verifier().Expect(6, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); @@ -599,7 +669,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { } // Server uses AsyncNotifyWhenDone API to check for cancellation -TEST_F(AsyncEnd2endTest, ServerCheckCancellation) { +TEST_P(AsyncEnd2endTest, ServerCheckCancellation) { ResetStub(); EchoRequest send_request; @@ -620,21 +690,21 @@ TEST_F(AsyncEnd2endTest, ServerCheckCancellation) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier().Expect(2, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); cli_ctx.TryCancel(); - Verifier().Expect(5, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); EXPECT_TRUE(srv_ctx.IsCancelled()); response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier().Expect(4, false).Verify(cq_.get()); + Verifier(GetParam()).Expect(4, false).Verify(cq_.get()); EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code()); } // Server uses AsyncNotifyWhenDone API to check for normal finish -TEST_F(AsyncEnd2endTest, ServerCheckDone) { +TEST_P(AsyncEnd2endTest, ServerCheckDone) { ResetStub(); EchoRequest send_request; @@ -655,23 +725,23 @@ TEST_F(AsyncEnd2endTest, ServerCheckDone) { service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), cq_.get(), tag(2)); - Verifier().Expect(2, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - Verifier().Expect(3, true).Verify(cq_.get()); - Verifier().Expect(5, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(3, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(5, true).Verify(cq_.get()); EXPECT_FALSE(srv_ctx.IsCancelled()); response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier().Expect(4, true).Verify(cq_.get()); + Verifier(GetParam()).Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); } -TEST_F(AsyncEnd2endTest, UnimplementedRpc) { +TEST_P(AsyncEnd2endTest, UnimplementedRpc) { std::shared_ptr<ChannelInterface> channel = CreateChannel( server_address_.str(), InsecureCredentials(), ChannelArguments()); std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub; @@ -687,12 +757,15 @@ TEST_F(AsyncEnd2endTest, UnimplementedRpc) { stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get())); response_reader->Finish(&recv_response, &recv_status, tag(4)); - Verifier().Expect(4, false).Verify(cq_.get()); + Verifier(GetParam()).Expect(4, false).Verify(cq_.get()); EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code()); EXPECT_EQ("", recv_status.error_message()); } +INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest, + ::testing::Values(false, true)); + } // namespace } // namespace testing } // namespace grpc