diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index ebc5cfc85a8602d401d2ed86f03769f081d217e3..ed410c9ef0ecaff799a6b87ccd8c14130f1408ab 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -56,8 +56,12 @@ DEFINE_string(test_case, "large_unary", "Configure different test cases. Valid options are: " "empty_unary : empty (zero bytes) request and response; " "large_unary : single request and (large) response; " + "large_compressed_unary : single request and compressed (large) " + "response; " "client_streaming : request streaming with single response; " "server_streaming : single request with response streaming; " + "server_compressed_streaming : single request with compressed " + "response streaming; " "slow_consumer : single request with response; " " streaming with slow client consumer; " "half_duplex : half-duplex streaming; " @@ -91,10 +95,14 @@ int main(int argc, char** argv) { client.DoEmpty(); } else if (FLAGS_test_case == "large_unary") { client.DoLargeUnary(); + } else if (FLAGS_test_case == "large_compressed_unary") { + client.DoLargeCompressedUnary(); } else if (FLAGS_test_case == "client_streaming") { client.DoRequestStreaming(); } else if (FLAGS_test_case == "server_streaming") { client.DoResponseStreaming(); + } else if (FLAGS_test_case == "server_compressed_streaming") { + client.DoResponseCompressedStreaming(); } else if (FLAGS_test_case == "slow_consumer") { client.DoResponseStreamingWithSlowConsumer(); } else if (FLAGS_test_case == "half_duplex") { @@ -129,6 +137,7 @@ int main(int argc, char** argv) { client.DoLargeUnary(); client.DoRequestStreaming(); client.DoResponseStreaming(); + client.DoResponseCompressedStreaming(); client.DoHalfDuplex(); client.DoPingPong(); client.DoCancelAfterBegin(); @@ -148,10 +157,11 @@ int main(int argc, char** argv) { gpr_log( GPR_ERROR, "Unsupported test case %s. Valid options are all|empty_unary|" - "large_unary|client_streaming|server_streaming|half_duplex|ping_pong|" - "cancel_after_begin|cancel_after_first_response|" - "timeout_on_sleeping_server|service_account_creds|compute_engine_creds|" - "jwt_token_creds|oauth2_auth_token|per_rpc_creds", + "large_unary|large_compressed_unary|client_streaming|server_streaming|" + "server_compressed_streaming|half_duplex|ping_pong|cancel_after_begin|" + "cancel_after_first_response|timeout_on_sleeping_server|" + "service_account_creds|compute_engine_creds|jwt_token_creds|" + "oauth2_auth_token|per_rpc_creds", FLAGS_test_case.c_str()); ret = 1; } diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 8e2d778cff7489d19b01d6e9f9f4a6667856b00e..7ad0f31575e34cf3629bd85e0016f677ca0e6cef 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -101,13 +101,29 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request, std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_)); ClientContext context; - InteropClientContextInspector inspector(context); + request->set_response_type(PayloadType::COMPRESSABLE); request->set_response_size(kLargeResponseSize); grpc::string payload(kLargeRequestSize, '\0'); request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); Status s = stub->UnaryCall(&context, *request, response); + AssertOkOrPrintErrorStatus(s); +} + +// Shared code to set large payload, make rpc and check response payload. +void InteropClient::PerformLargeCompressedUnary(SimpleRequest* request, + SimpleResponse* response) { + std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_)); + + ClientContext context; + InteropClientContextInspector inspector(context); + request->set_response_size(kLargeResponseSize); + grpc::string payload(kLargeRequestSize, '\0'); + request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); + + Status s = stub->CompressedUnaryCall(&context, *request, response); + // Compression related checks. GPR_ASSERT(request->response_compression() == GetInteropCompressionTypeFromCompressionAlgorithm( @@ -245,6 +261,14 @@ void InteropClient::DoJwtTokenCreds(const grpc::string& username) { } void InteropClient::DoLargeUnary() { + gpr_log(GPR_INFO, "Sending a large unary rpc..."); + SimpleRequest request; + SimpleResponse response; + PerformLargeUnary(&request, &response); + gpr_log(GPR_INFO, "Large unary done."); +} + +void InteropClient::DoLargeCompressedUnary() { const CompressionType compression_types[] = {NONE, GZIP, DEFLATE}; const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM}; for (const auto payload_type : payload_types) { @@ -293,6 +317,32 @@ void InteropClient::DoRequestStreaming() { } void InteropClient::DoResponseStreaming() { + gpr_log(GPR_INFO, "Receiving response steaming rpc ..."); + std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_)); + + ClientContext context; + StreamingOutputCallRequest request; + for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) { + ResponseParameters* response_parameter = request.add_response_parameters(); + response_parameter->set_size(response_stream_sizes[i]); + } + StreamingOutputCallResponse response; + std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( + stub->StreamingOutputCall(&context, request)); + + unsigned int i = 0; + while (stream->Read(&response)) { + GPR_ASSERT(response.payload().body() == + grpc::string(response_stream_sizes[i], '\0')); + ++i; + } + GPR_ASSERT(response_stream_sizes.size() == i); + Status s = stream->Finish(); + AssertOkOrPrintErrorStatus(s); + gpr_log(GPR_INFO, "Response streaming done."); +} + +void InteropClient::DoResponseCompressedStreaming() { std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_)); const CompressionType compression_types[] = {NONE, GZIP, DEFLATE}; diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index 6e26c49e5da1d810161318657aac70a36832e1ea..995b13036a6205483a2e20dff99980092561311e 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -52,10 +52,12 @@ class InteropClient { void DoEmpty(); void DoLargeUnary(); + void DoLargeCompressedUnary(); void DoPingPong(); void DoHalfDuplex(); void DoRequestStreaming(); void DoResponseStreaming(); + void DoResponseCompressedStreaming(); void DoResponseStreamingWithSlowConsumer(); void DoCancelAfterBegin(); void DoCancelAfterFirstResponse(); @@ -78,6 +80,8 @@ class InteropClient { private: void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response); + void PerformLargeCompressedUnary(SimpleRequest* request, + SimpleResponse* response); void AssertOkOrPrintErrorStatus(const Status& s); std::shared_ptr<ChannelInterface> channel_; diff --git a/test/cpp/interop/server.cc b/test/cpp/interop/server.cc index 0097d1678c2790eb0269029bc8c71702552c29ae..ac0a2e512bfbfca5a402475617a1dcaa37b8a6a0 100644 --- a/test/cpp/interop/server.cc +++ b/test/cpp/interop/server.cc @@ -140,16 +140,12 @@ class TestServiceImpl : public TestService::Service { Status UnaryCall(ServerContext* context, const SimpleRequest* request, SimpleResponse* response) { - InteropServerContextInspector inspector(*context); - SetResponseCompression(context, *request); if (request->has_response_size() && request->response_size() > 0) { if (!SetPayload(request->response_type(), request->response_size(), response->mutable_payload())) { return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); } } - const gpr_uint32 client_accept_encodings_bitset = - inspector.GetEncodingsAcceptedByClient(); if (request->has_response_status()) { return Status(static_cast<grpc::StatusCode> @@ -160,6 +156,13 @@ class TestServiceImpl : public TestService::Service { return Status::OK; } + Status CompressedUnaryCall(ServerContext* context, + const SimpleRequest* request, + SimpleResponse* response) { + SetResponseCompression(context, *request); + return UnaryCall(context, request, response); + } + Status StreamingOutputCall( ServerContext* context, const StreamingOutputCallRequest* request, ServerWriter<StreamingOutputCallResponse>* writer) { @@ -180,6 +183,13 @@ class TestServiceImpl : public TestService::Service { } } + Status CompressedStreamingOutputCall( + ServerContext* context, const StreamingOutputCallRequest* request, + ServerWriter<StreamingOutputCallResponse>* writer) { + SetResponseCompression(context, *request); + return StreamingOutputCall(context, request, writer); + } + Status StreamingInputCall(ServerContext* context, ServerReader<StreamingInputCallRequest>* reader, StreamingInputCallResponse* response) { diff --git a/test/proto/test.proto b/test/proto/test.proto index 368522dc4c925d67390b1efdd2c52216aa5bf8d2..574c6a5b50e48c263fc47e70cf2ad855715ee521 100644 --- a/test/proto/test.proto +++ b/test/proto/test.proto @@ -48,6 +48,9 @@ service TestService { // TODO(Issue 527): Describe required server behavior. rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + // One request followed by one compressed response. + rpc CompressedUnaryCall(SimpleRequest) returns (SimpleResponse); + // One request followed by a sequence of responses (streamed download). // The server returns the payload with client desired type and sizes. rpc StreamingOutputCall(StreamingOutputCallRequest)