From 61536a737176e5429b0b1f336e054f9743a03b91 Mon Sep 17 00:00:00 2001 From: Vijay Pai <vpai@google.com> Date: Thu, 13 Oct 2016 11:04:35 -0700 Subject: [PATCH] Making split streaming testable --- .../grpc++/impl/codegen/method_handler_impl.h | 3 +- src/compiler/cpp_generator.cc | 2 +- .../testing/duplicate/echo_duplicate.proto | 1 + test/cpp/end2end/hybrid_end2end_test.cc | 61 +++++++++++++++++++ 4 files changed, 64 insertions(+), 3 deletions(-) diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 8f417f671a..bb992f0e18 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -243,8 +243,7 @@ class SplitServerStreamingHandler public: explicit SplitServerStreamingHandler( std::function<Status(ServerContext*, - SplitServerStreamingHandler<RequestType, - ResponseType>*)> + ServerSplitStreamer<RequestType, ResponseType>*)> func) : TemplatedBidiStreamingHandler< ServerSplitStreamer<RequestType, ResponseType>, false>(func) {} diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index d0f164e58d..8525a6a2c9 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -691,7 +691,7 @@ void PrintHeaderServerMethodSplitStreaming( "// disable regular version of this method\n" "::grpc::Status $Method$(" "::grpc::ServerContext* context, const $Request$* request, " - "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n" + "::grpc::ServerWriter< $Response$>* writer) GRPC_FINAL GRPC_OVERRIDE {\n" " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); diff --git a/src/proto/grpc/testing/duplicate/echo_duplicate.proto b/src/proto/grpc/testing/duplicate/echo_duplicate.proto index 94130ea767..97fdbc4fd3 100644 --- a/src/proto/grpc/testing/duplicate/echo_duplicate.proto +++ b/src/proto/grpc/testing/duplicate/echo_duplicate.proto @@ -38,4 +38,5 @@ package grpc.testing.duplicate; service EchoTestService { rpc Echo(grpc.testing.EchoRequest) returns (grpc.testing.EchoResponse); + rpc ResponseStream(EchoRequest) returns (stream EchoResponse); } diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index 8cd2e66347..1ef2af2e2e 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -320,6 +320,26 @@ class HybridEnd2endTest : public ::testing::Test { EXPECT_TRUE(s.ok()); } + void SendSimpleServerStreamingToDupService() { + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_wait_for_ready(true); + request.set_message("hello"); + + auto stream = stub_->ResponseStream(&context, request); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "0_dup"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "1_dup"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "2_dup"); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); + } + void SendBidiStreaming() { EchoRequest request; EchoResponse response; @@ -498,6 +518,47 @@ TEST_F(HybridEnd2endTest, request_stream_handler_thread.join(); } +// Add a second service with one sync split server streaming method. +class SplitResponseStreamDupPkg + : public duplicate::EchoTestService::WithSplitStreamingMethod_ResponseStream< + TestServiceImplDupPkg> { + public: + Status StreamedResponseStream(ServerContext* context, + ServerSplitStreamer<EchoRequest, EchoResponse>* stream) + GRPC_OVERRIDE { + EchoRequest req; + EchoResponse resp; + uint32_t next_msg_sz; + stream->NextMessageSize(&next_msg_sz); + gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz); + GPR_ASSERT(stream->Read(&req)); + for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + resp.set_message(req.message() + grpc::to_string(i) + "_dup"); + GPR_ASSERT(stream->Write(resp)); + } + return Status::OK; + } +}; + +TEST_F(HybridEnd2endTest, + AsyncRequestStreamResponseStream_SyncSplitStreamedDupService) { + typedef EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>> + SType; + SType service; + SplitResponseStreamDupPkg dup_service; + SetUpServer(&service, &dup_service, nullptr, 8192); + ResetStub(); + std::thread response_stream_handler_thread(HandleServerStreaming<SType>, + &service, cqs_[0].get()); + std::thread request_stream_handler_thread(HandleClientStreaming<SType>, + &service, cqs_[1].get()); + TestAllMethods(); + SendSimpleServerStreamingToDupService(); + response_stream_handler_thread.join(); + request_stream_handler_thread.join(); +} + // Add a second service with one async method. TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) { typedef EchoTestService::WithAsyncMethod_RequestStream< -- GitLab