diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 7698e86cf410c2ce954f29a20755c5927894ceda..f03aa9fbd00a93115483705e7d2b2c3f8bd3a494 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -65,9 +65,7 @@ namespace testing { namespace { -void* tag(int i) { - return (void*)(gpr_intptr)i; -} +void* tag(int i) { return (void*)(gpr_intptr)i; } void verify_ok(CompletionQueue* cq, int i, bool expect_ok) { bool ok; @@ -109,18 +107,10 @@ class AsyncEnd2endTest : public ::testing::Test { stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); } - void server_ok(int i) { - verify_ok(&srv_cq_, i, true); - } - void client_ok(int i) { - verify_ok(&cli_cq_, i , true); - } - void server_fail(int i) { - verify_ok(&srv_cq_, i, false); - } - void client_fail(int i) { - verify_ok(&cli_cq_, i, false); - } + void server_ok(int i) { verify_ok(&srv_cq_, i, true); } + void client_ok(int i) { verify_ok(&cli_cq_, i, true); } + void server_fail(int i) { verify_ok(&srv_cq_, i, false); } + void client_fail(int i) { verify_ok(&cli_cq_, i, false); } void SendRpc(int num_rpcs) { for (int i = 0; i < num_rpcs; i++) { @@ -135,12 +125,11 @@ class AsyncEnd2endTest : public ::testing::Test { grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); send_request.set_message("Hello"); - std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > - response_reader(stub_->AsyncEcho( - &cli_ctx, send_request, &cli_cq_, tag(1))); + std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( + stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho( - &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, + tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -193,8 +182,7 @@ TEST_F(AsyncEnd2endTest, SimpleClientStreaming) { std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream( stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq_, tag(1))); - service_.RequestRequestStream( - &srv_ctx, &srv_stream, &srv_cq_, tag(2)); + service_.RequestRequestStream(&srv_ctx, &srv_stream, &srv_cq_, tag(2)); server_ok(2); client_ok(1); @@ -247,8 +235,8 @@ TEST_F(AsyncEnd2endTest, SimpleServerStreaming) { std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestResponseStream( - &srv_ctx, &recv_request, &srv_stream, &srv_cq_, tag(2)); + service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, &srv_cq_, + tag(2)); server_ok(2); client_ok(1); @@ -298,8 +286,7 @@ TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) { std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> > cli_stream(stub_->AsyncBidiStream(&cli_ctx, &cli_cq_, tag(1))); - service_.RequestBidiStream( - &srv_ctx, &srv_stream, &srv_cq_, tag(2)); + service_.RequestBidiStream(&srv_ctx, &srv_stream, &srv_cq_, tag(2)); server_ok(2); client_ok(1); @@ -357,8 +344,8 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho( - &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, + tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); @@ -399,8 +386,8 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho( - &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, + tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); srv_ctx.AddInitialMetadata(meta1.first, meta1.second); @@ -447,8 +434,8 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho( - &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, + tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); response_writer.SendInitialMetadata(tag(3)); @@ -462,7 +449,6 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { server_ok(4); - response_reader->Finish(&recv_response, &recv_status, tag(5)); client_ok(5); EXPECT_EQ(send_response.message(), recv_response.message()); @@ -491,10 +477,12 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { std::pair<grpc::string, grpc::string> meta2( "key2-bin", {"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13}); std::pair<grpc::string, grpc::string> meta3("key3", "val3"); - std::pair<grpc::string, grpc::string> meta6("key4-bin", + std::pair<grpc::string, grpc::string> meta6( + "key4-bin", {"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d", 14}); std::pair<grpc::string, grpc::string> meta5("key5", "val5"); - std::pair<grpc::string, grpc::string> meta4("key6-bin", + std::pair<grpc::string, grpc::string> meta4( + "key6-bin", {"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15}); cli_ctx.AddMetadata(meta1.first, meta1.second); @@ -503,8 +491,8 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho( - &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, + tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); @@ -531,7 +519,6 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { server_ok(5); - response_reader->Finish(&recv_response, &recv_status, tag(6)); client_ok(6); EXPECT_EQ(send_response.message(), recv_response.message()); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 40344914dc0133bf3f916407874efa4772d0f911..0c48f08ea021fac87074c3530f4bf1dde71794c7 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -84,8 +84,8 @@ using grpc::testing::TestService; // In some distros, gflags is in the namespace google, and in some others, // in gflags. This hack is enabling us to find both. -namespace google { } -namespace gflags { } +namespace google {} +namespace gflags {} using namespace google; using namespace gflags; @@ -94,63 +94,67 @@ static double now() { return 1e9 * tv.tv_sec + tv.tv_nsec; } - class ClientRpcContext { - public: - ClientRpcContext() {} - virtual ~ClientRpcContext() {} - virtual bool operator()() = 0; // do next state, return false if steps done - static void *tag(ClientRpcContext *c) {return reinterpret_cast<void *>(c);} - static ClientRpcContext *detag(void *t) { - return reinterpret_cast<ClientRpcContext *>(t); - } - virtual void report_stats(gpr_histogram *hist) = 0; - }; - template <class RequestType, class ResponseType> - class ClientRpcContextUnaryImpl : public ClientRpcContext { - public: - ClientRpcContextUnaryImpl(const RequestType& req, - std::function<std::unique_ptr<grpc::ClientAsyncResponseReader< - ResponseType>>(grpc::ClientContext *, - const RequestType&, void *)> start_req, - std::function<void(grpc::Status, ResponseType *)> on_done): - context_(), req_(req), response_(), - next_state_(&ClientRpcContextUnaryImpl::ReqSent), - callback_(on_done), - start_(now()), - response_reader_(start_req(&context_, req_, - ClientRpcContext::tag(this))) { - } - ~ClientRpcContextUnaryImpl() override {} - bool operator()() override {return (this->*next_state_)();} - void report_stats(gpr_histogram *hist) override { - gpr_histogram_add(hist, now()-start_); - } - private: - bool ReqSent() { - next_state_ = &ClientRpcContextUnaryImpl::RespDone; - response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); - return true; - } - bool RespDone() { - next_state_ = &ClientRpcContextUnaryImpl::DoCallBack; - return false; - } - bool DoCallBack() { - callback_(status_, &response_); - return false; - } - grpc::ClientContext context_; - RequestType req_; - ResponseType response_; - bool (ClientRpcContextUnaryImpl::*next_state_)(); - std::function<void(grpc::Status, ResponseType *)> callback_; - grpc::Status status_; - double start_; - std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> response_reader_; - }; +class ClientRpcContext { + public: + ClientRpcContext() {} + virtual ~ClientRpcContext() {} + virtual bool operator()() = 0; // do next state, return false if steps done + static void *tag(ClientRpcContext *c) { return reinterpret_cast<void *>(c); } + static ClientRpcContext *detag(void *t) { + return reinterpret_cast<ClientRpcContext *>(t); + } + virtual void report_stats(gpr_histogram *hist) = 0; +}; +template <class RequestType, class ResponseType> +class ClientRpcContextUnaryImpl : public ClientRpcContext { + public: + ClientRpcContextUnaryImpl( + const RequestType &req, + std::function< + std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( + grpc::ClientContext *, const RequestType &, void *)> start_req, + std::function<void(grpc::Status, ResponseType *)> on_done) + : context_(), + req_(req), + response_(), + next_state_(&ClientRpcContextUnaryImpl::ReqSent), + callback_(on_done), + start_(now()), + response_reader_( + start_req(&context_, req_, ClientRpcContext::tag(this))) {} + ~ClientRpcContextUnaryImpl() override {} + bool operator()() override { return (this->*next_state_)(); } + void report_stats(gpr_histogram *hist) override { + gpr_histogram_add(hist, now() - start_); + } + + private: + bool ReqSent() { + next_state_ = &ClientRpcContextUnaryImpl::RespDone; + response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); + return true; + } + bool RespDone() { + next_state_ = &ClientRpcContextUnaryImpl::DoCallBack; + return false; + } + bool DoCallBack() { + callback_(status_, &response_); + return false; + } + grpc::ClientContext context_; + RequestType req_; + ResponseType response_; + bool (ClientRpcContextUnaryImpl::*next_state_)(); + std::function<void(grpc::Status, ResponseType *)> callback_; + grpc::Status status_; + double start_; + std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> + response_reader_; +}; static void RunTest(const int client_threads, const int client_channels, - const int num_rpcs, const int payload_size) { + const int num_rpcs, const int payload_size) { gpr_log(GPR_INFO, "QPS test with parameters\n" "enable_ssl = %d\n" @@ -197,71 +201,65 @@ static void RunTest(const int client_threads, const int client_channels, grpc_profiler_start("qps_client_async.prof"); auto CheckDone = [=](grpc::Status s, SimpleResponse *response) { - GPR_ASSERT(s.IsOk() && - (response->payload().type() == - grpc::testing::PayloadType::COMPRESSABLE) && - (response->payload().body().length() == - static_cast<size_t>(payload_size))); + GPR_ASSERT(s.IsOk() && (response->payload().type() == + grpc::testing::PayloadType::COMPRESSABLE) && + (response->payload().body().length() == + static_cast<size_t>(payload_size))); }; - + for (int i = 0; i < client_threads; i++) { gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); GPR_ASSERT(hist != NULL); thread_stats[i] = hist; - threads.push_back( - std::thread([hist, client_threads, client_channels, num_rpcs, - payload_size, &channels, &CheckDone](int channel_num) { - using namespace std::placeholders; - SimpleRequest request; - request.set_response_type( - grpc::testing::PayloadType::COMPRESSABLE); - request.set_response_size(payload_size); - - grpc::CompletionQueue cli_cq; - - int rpcs_sent=0; - while (rpcs_sent < num_rpcs) { - rpcs_sent++; - TestService::Stub *stub = - channels[channel_num].get_stub(); - grpc::ClientContext context; - auto start_req = std::bind(&TestService::Stub::AsyncUnaryCall, - stub, _1, _2, &cli_cq, _3); - new ClientRpcContextUnaryImpl<SimpleRequest, - SimpleResponse>(request, - start_req, - CheckDone); - void *got_tag; - bool ok; - - // Need to call 2 next for every 1 RPC (1 for req done, 1 for resp done) - cli_cq.Next(&got_tag,&ok); - if (!ok) - break; - ClientRpcContext *ctx = ClientRpcContext::detag(got_tag); - if ((*ctx)() == false) { - // call the callback and then delete it - (*ctx)(); - delete ctx; - } - cli_cq.Next(&got_tag,&ok); - if (!ok) - break; - ctx = ClientRpcContext::detag(got_tag); - if ((*ctx)() == false) { - // call the callback and then delete it - ctx->report_stats(hist); - (*ctx)(); - delete ctx; - } - // Now do runtime round-robin assignment of the next - // channel number - channel_num += client_threads; - channel_num %= client_channels; - } - }, - i % client_channels)); + threads.push_back(std::thread( + [hist, client_threads, client_channels, num_rpcs, payload_size, + &channels, &CheckDone](int channel_num) { + using namespace std::placeholders; + SimpleRequest request; + request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + request.set_response_size(payload_size); + + grpc::CompletionQueue cli_cq; + + int rpcs_sent = 0; + while (rpcs_sent < num_rpcs) { + rpcs_sent++; + TestService::Stub *stub = channels[channel_num].get_stub(); + grpc::ClientContext context; + auto start_req = std::bind(&TestService::Stub::AsyncUnaryCall, stub, + _1, _2, &cli_cq, _3); + new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( + request, start_req, CheckDone); + void *got_tag; + bool ok; + + // Need to call 2 next for every 1 RPC (1 for req done, 1 for resp + // done) + cli_cq.Next(&got_tag, &ok); + if (!ok) break; + ClientRpcContext *ctx = ClientRpcContext::detag(got_tag); + if ((*ctx)() == false) { + // call the callback and then delete it + (*ctx)(); + delete ctx; + } + cli_cq.Next(&got_tag, &ok); + if (!ok) break; + ctx = ClientRpcContext::detag(got_tag); + if ((*ctx)() == false) { + // call the callback and then delete it + ctx->report_stats(hist); + (*ctx)(); + delete ctx; + } + // Now do runtime round-robin assignment of the next + // channel number + channel_num += client_threads; + channel_num %= client_channels; + } + }, + i % client_channels)); } gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index fec17ea79fa28b9645368a06e7bbd5a3b7b3c904..b68c2c954833cbc7bffb286ce7ae3d357a06c571 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -124,10 +124,12 @@ class AsyncQpsServerTest { std::bind(&TestService::AsyncService::RequestCollectServerStats, &async_service_, _1, _2, _3, &srv_cq_, _4); for (int i = 0; i < 100; i++) { - contexts_.push_front(new ServerRpcContextUnaryImpl<SimpleRequest, - SimpleResponse>(request_unary_, UnaryCall)); - contexts_.push_front(new ServerRpcContextUnaryImpl<StatsRequest, - ServerStats>(request_stats_, CollectServerStats)); + contexts_.push_front( + new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( + request_unary_, UnaryCall)); + contexts_.push_front( + new ServerRpcContextUnaryImpl<StatsRequest, ServerStats>( + request_stats_, CollectServerStats)); } } ~AsyncQpsServerTest() { @@ -151,12 +153,12 @@ class AsyncQpsServerTest { void *got_tag; while (srv_cq_.Next(&got_tag, &ok)) { EXPECT_EQ(ok, true); - ServerRpcContext *ctx = detag(got_tag); + ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke if ((*ctx)() == false) { - // this RPC context is done, so refresh it - ctx->refresh(); - } + // this RPC context is done, so refresh it + ctx->refresh(); + } } return; })); @@ -165,13 +167,14 @@ class AsyncQpsServerTest { std::this_thread::sleep_for(std::chrono::seconds(5)); } } + private: class ServerRpcContext { - public: + public: ServerRpcContext() {} - virtual ~ServerRpcContext() {}; - virtual bool operator()() = 0; // do next state, return false if all done - virtual void refresh() = 0; // start this back at a clean state + virtual ~ServerRpcContext(){}; + virtual bool operator()() = 0; // do next state, return false if all done + virtual void refresh() = 0; // start this back at a clean state }; static void *tag(ServerRpcContext *func) { return reinterpret_cast<void *>(func); @@ -192,25 +195,26 @@ class AsyncQpsServerTest { : next_state_(&ServerRpcContextUnaryImpl::invoker), request_method_(request_method), invoke_method_(invoke_method), - response_writer_(&srv_ctx_) { + response_writer_(&srv_ctx_) { request_method_(&srv_ctx_, &req_, &response_writer_, - AsyncQpsServerTest::tag(this)); + AsyncQpsServerTest::tag(this)); } ~ServerRpcContextUnaryImpl() override {} - bool operator()() override {return (this->*next_state_)();} + bool operator()() override { return (this->*next_state_)(); } void refresh() override { srv_ctx_ = ServerContext(); req_ = RequestType(); response_writer_ = - grpc::ServerAsyncResponseWriter<ResponseType>(&srv_ctx_); + grpc::ServerAsyncResponseWriter<ResponseType>(&srv_ctx_); // Then request the method next_state_ = &ServerRpcContextUnaryImpl::invoker; request_method_(&srv_ctx_, &req_, &response_writer_, - AsyncQpsServerTest::tag(this)); + AsyncQpsServerTest::tag(this)); } + private: - bool finisher() {return false;} + bool finisher() { return false; } bool invoker() { ResponseType response; @@ -219,8 +223,7 @@ class AsyncQpsServerTest { // Have the response writer work and invoke on_finish when done next_state_ = &ServerRpcContextUnaryImpl::finisher; - response_writer_.Finish(response, status, - AsyncQpsServerTest::tag(this)); + response_writer_.Finish(response, status, AsyncQpsServerTest::tag(this)); return true; } ServerContext srv_ctx_;