diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 8f417f671aaf9388717a88df0782d8572ec7ed58..bb992f0e185500677990e974ebc267170d9f2b45 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -243,8 +243,7 @@ class SplitServerStreamingHandler public: explicit SplitServerStreamingHandler( std::function<Status(ServerContext*, - SplitServerStreamingHandler<RequestType, - ResponseType>*)> + ServerSplitStreamer<RequestType, ResponseType>*)> func) : TemplatedBidiStreamingHandler< ServerSplitStreamer<RequestType, ResponseType>, false>(func) {} diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index d0f164e58dfdc5611b6f57f43d263854659d6098..8525a6a2c963601a09a5fe130dc23fe3214d6aae 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -691,7 +691,7 @@ void PrintHeaderServerMethodSplitStreaming( "// disable regular version of this method\n" "::grpc::Status $Method$(" "::grpc::ServerContext* context, const $Request$* request, " - "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n" + "::grpc::ServerWriter< $Response$>* writer) GRPC_FINAL GRPC_OVERRIDE {\n" " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); diff --git a/src/proto/grpc/testing/duplicate/echo_duplicate.proto b/src/proto/grpc/testing/duplicate/echo_duplicate.proto index 94130ea767841ace09ead6e8340abfcc9ad1c93e..97fdbc4fd3091cafaf80df015243356686b3654c 100644 --- a/src/proto/grpc/testing/duplicate/echo_duplicate.proto +++ b/src/proto/grpc/testing/duplicate/echo_duplicate.proto @@ -38,4 +38,5 @@ package grpc.testing.duplicate; service EchoTestService { rpc Echo(grpc.testing.EchoRequest) returns (grpc.testing.EchoResponse); + rpc ResponseStream(EchoRequest) returns (stream EchoResponse); } diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index 8cd2e663470ceae2bc01e5ce8db01e85ed06a485..1ef2af2e2efcad18ef6ad17e5c96ffd50709b0f0 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -320,6 +320,26 @@ class HybridEnd2endTest : public ::testing::Test { EXPECT_TRUE(s.ok()); } + void SendSimpleServerStreamingToDupService() { + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_wait_for_ready(true); + request.set_message("hello"); + + auto stream = stub_->ResponseStream(&context, request); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "0_dup"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "1_dup"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "2_dup"); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); + } + void SendBidiStreaming() { EchoRequest request; EchoResponse response; @@ -498,6 +518,47 @@ TEST_F(HybridEnd2endTest, request_stream_handler_thread.join(); } +// Add a second service with one sync split server streaming method. +class SplitResponseStreamDupPkg + : public duplicate::EchoTestService::WithSplitStreamingMethod_ResponseStream< + TestServiceImplDupPkg> { + public: + Status StreamedResponseStream(ServerContext* context, + ServerSplitStreamer<EchoRequest, EchoResponse>* stream) + GRPC_OVERRIDE { + EchoRequest req; + EchoResponse resp; + uint32_t next_msg_sz; + stream->NextMessageSize(&next_msg_sz); + gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz); + GPR_ASSERT(stream->Read(&req)); + for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + resp.set_message(req.message() + grpc::to_string(i) + "_dup"); + GPR_ASSERT(stream->Write(resp)); + } + return Status::OK; + } +}; + +TEST_F(HybridEnd2endTest, + AsyncRequestStreamResponseStream_SyncSplitStreamedDupService) { + typedef EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>> + SType; + SType service; + SplitResponseStreamDupPkg dup_service; + SetUpServer(&service, &dup_service, nullptr, 8192); + ResetStub(); + std::thread response_stream_handler_thread(HandleServerStreaming<SType>, + &service, cqs_[0].get()); + std::thread request_stream_handler_thread(HandleClientStreaming<SType>, + &service, cqs_[1].get()); + TestAllMethods(); + SendSimpleServerStreamingToDupService(); + response_stream_handler_thread.join(); + request_stream_handler_thread.join(); +} + // Add a second service with one async method. TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) { typedef EchoTestService::WithAsyncMethod_RequestStream<