diff --git a/Makefile b/Makefile index 1c3f77a5d6c38f90d0732da75784f4485284bb81..c2b2a0a7ec7f684e1ea4bc5a39f58c908805133a 100644 --- a/Makefile +++ b/Makefile @@ -1057,6 +1057,7 @@ LIBGRPC++_SRC = \ src/cpp/client/channel.cc \ src/cpp/client/client_context.cc \ src/cpp/client/create_channel.cc \ + src/cpp/client/credentials.cc \ src/cpp/client/internal_stub.cc \ src/cpp/proto/proto_utils.cc \ src/cpp/rpc_method.cc \ @@ -1066,6 +1067,7 @@ LIBGRPC++_SRC = \ src/cpp/server/server_builder.cc \ src/cpp/server/server.cc \ src/cpp/server/server_rpc_handler.cc \ + src/cpp/server/server_credentials.cc \ src/cpp/server/thread_pool.cc \ src/cpp/stream/stream_context.cc \ src/cpp/util/status.cc \ @@ -1079,12 +1081,13 @@ PUBLIC_HEADERS_CXX += \ include/grpc++/completion_queue.h \ include/grpc++/config.h \ include/grpc++/create_channel.h \ + include/grpc++/credentials.h \ include/grpc++/server_builder.h \ + include/grpc++/server_credentials.h \ include/grpc++/server.h \ include/grpc++/status.h \ include/grpc++/stream_context_interface.h \ include/grpc++/stream.h \ - include/grpc++/thread_pool_interface.h \ LIBGRPC++_OBJS = $(addprefix objs/, $(addsuffix .o, $(basename $(LIBGRPC++_SRC)))) LIBGRPC++_DEPS = $(addprefix deps/, $(addsuffix .dep, $(basename $(LIBGRPC++_SRC)))) @@ -4434,7 +4437,10 @@ clean_sync_client_async_server_test: QPS_CLIENT_SRC = \ - test/cpp/qps/qps_client.cc \ + gens/test/cpp/interop/empty.pb.cc \ + gens/test/cpp/interop/messages.pb.cc \ + gens/test/cpp/interop/test.pb.cc \ + test/cpp/qps/client.cc \ QPS_CLIENT_OBJS = $(addprefix objs/, $(addsuffix .o, $(basename $(QPS_CLIENT_SRC)))) QPS_CLIENT_DEPS = $(addprefix deps/, $(addsuffix .dep, $(basename $(QPS_CLIENT_SRC)))) @@ -4445,10 +4451,10 @@ bins/qps_client: openssl_dep_error else -bins/qps_client: $(QPS_CLIENT_OBJS) libs/libgrpc_test_util.a libs/libgrpc++.a libs/libgrpc.a libs/libgpr.a +bins/qps_client: $(QPS_CLIENT_OBJS) libs/libgrpc++_test_util.a libs/libgrpc_test_util.a libs/libgrpc++.a libs/libgrpc.a libs/libgpr.a $(E) "[LD] Linking $@" $(Q) mkdir -p `dirname $@` - $(Q) $(LDXX) $(LDFLAGS) $(QPS_CLIENT_OBJS) $(GTEST_LIB) -Llibs -lgrpc_test_util -lgrpc++ -lgrpc -lgpr $(LDLIBSXX) $(LDLIBS) $(LDLIBS_SECURE) -o bins/qps_client + $(Q) $(LDXX) $(LDFLAGS) $(QPS_CLIENT_OBJS) $(GTEST_LIB) -Llibs -lgrpc++_test_util -lgrpc_test_util -lgrpc++ -lgrpc -lgpr $(LDLIBSXX) $(LDLIBS) $(LDLIBS_SECURE) -o bins/qps_client endif @@ -4468,7 +4474,10 @@ clean_qps_client: QPS_SERVER_SRC = \ - test/cpp/end2end/qps_server.cc \ + gens/test/cpp/interop/empty.pb.cc \ + gens/test/cpp/interop/messages.pb.cc \ + gens/test/cpp/interop/test.pb.cc \ + test/cpp/qps/server.cc \ QPS_SERVER_OBJS = $(addprefix objs/, $(addsuffix .o, $(basename $(QPS_SERVER_SRC)))) QPS_SERVER_DEPS = $(addprefix deps/, $(addsuffix .dep, $(basename $(QPS_SERVER_SRC)))) @@ -4479,10 +4488,10 @@ bins/qps_server: openssl_dep_error else -bins/qps_server: $(QPS_SERVER_OBJS) libs/libgrpc_test_util.a libs/libgrpc++.a libs/libgrpc.a libs/libgpr.a +bins/qps_server: $(QPS_SERVER_OBJS) libs/libgrpc++_test_util.a libs/libgrpc_test_util.a libs/libgrpc++.a libs/libgrpc.a libs/libgpr.a $(E) "[LD] Linking $@" $(Q) mkdir -p `dirname $@` - $(Q) $(LDXX) $(LDFLAGS) $(QPS_SERVER_OBJS) $(GTEST_LIB) -Llibs -lgrpc_test_util -lgrpc++ -lgrpc -lgpr $(LDLIBSXX) $(LDLIBS) $(LDLIBS_SECURE) -o bins/qps_server + $(Q) $(LDXX) $(LDFLAGS) $(QPS_SERVER_OBJS) $(GTEST_LIB) -Llibs -lgrpc++_test_util -lgrpc_test_util -lgrpc++ -lgrpc -lgpr $(LDLIBSXX) $(LDLIBS) $(LDLIBS_SECURE) -o bins/qps_server endif diff --git a/build.json b/build.json index a1a9bd3810ae8d13fb0296a62e4d896271b039ac..1eae8080885811d383e894a83c9b49821d4c91e9 100644 --- a/build.json +++ b/build.json @@ -304,6 +304,7 @@ "src/cpp/client/channel.cc", "src/cpp/client/client_context.cc", "src/cpp/client/create_channel.cc", + "src/cpp/client/credentials.cc", "src/cpp/client/internal_stub.cc", "src/cpp/proto/proto_utils.cc", "src/cpp/rpc_method.cc", @@ -313,6 +314,7 @@ "src/cpp/server/server_builder.cc", "src/cpp/server/server.cc", "src/cpp/server/server_rpc_handler.cc", + "src/cpp/server/server_credentials.cc", "src/cpp/server/thread_pool.cc", "src/cpp/stream/stream_context.cc", "src/cpp/util/status.cc", @@ -326,12 +328,13 @@ "include/grpc++/completion_queue.h", "include/grpc++/config.h", "include/grpc++/create_channel.h", + "include/grpc++/credentials.h", "include/grpc++/server_builder.h", + "include/grpc++/server_credentials.h", "include/grpc++/server.h", "include/grpc++/status.h", "include/grpc++/stream_context_interface.h", - "include/grpc++/stream.h", - "include/grpc++/thread_pool_interface.h" + "include/grpc++/stream.h" ], "headers": [ "src/cpp/client/channel.h", @@ -1198,9 +1201,13 @@ "build": "test", "c++": true, "src": [ - "test/cpp/qps/qps_client.cc" + "test/cpp/interop/empty.proto", + "test/cpp/interop/messages.proto", + "test/cpp/interop/test.proto", + "test/cpp/qps/client.cc" ], "deps": [ + "grpc++_test_util", "grpc_test_util", "grpc++", "grpc", @@ -1212,9 +1219,13 @@ "build": "test", "c++": true, "src": [ - "test/cpp/end2end/qps_server.cc" + "test/cpp/interop/empty.proto", + "test/cpp/interop/messages.proto", + "test/cpp/interop/test.proto", + "test/cpp/qps/server.cc" ], "deps": [ + "grpc++_test_util", "grpc_test_util", "grpc++", "grpc", diff --git a/test/cpp/qps/client.cc b/test/cpp/qps/client.cc new file mode 100644 index 0000000000000000000000000000000000000000..e0f13a8ca2122cc00e807e38779f6020880f5f18 --- /dev/null +++ b/test/cpp/qps/client.cc @@ -0,0 +1,221 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + + +#include <cassert> +#include <memory> +#include <string> +#include <thread> +#include <vector> +#include <sstream> + +#include <grpc/grpc.h> +#include <grpc/support/histogram.h> +#include <grpc/support/log.h> +#include <google/gflags.h> +#include <grpc++/channel_interface.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/status.h> +#include "test/cpp/util/test_ssl_channel.h" +#include "test/cpp/interop/test.pb.h" + +DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); +DEFINE_int32(server_port, 0, "Server port."); +DEFINE_string(server_host, "127.0.0.1", "Server host."); +DEFINE_int32(client_threads, 4, "Number of client threads."); + +// We have a configurable number of channels for sending RPCs. +// RPCs are sent round-robin on the available channels by the +// various threads. Interesting cases are 1 global channel or +// 1 per-thread channel, but we can support any number. +// The channels are assigned round-robin on an RPC by RPC basis +// rather than just at initialization time in order to also measure the +// impact of cache thrashing caused by channel changes. This is an issue +// if you are not in one of the above "interesting cases" +DEFINE_int32(client_channels, 4, "Number of client channels."); + +DEFINE_int32(num_rpcs, 1000, "Number of RPCs per thread."); +DEFINE_int32(payload_size, 1, "Payload size in bytes"); + +// Alternatively, specify parameters for test as a workload so that multiple +// tests are initiated back-to-back. This is convenient for keeping a borg +// allocation consistent. This is a space-separated list of +// [threads channels num_rpcs payload_size ]* +DEFINE_string(workload, "", "Workload parameters"); + +using grpc::ChannelInterface; +using grpc::CreateChannel; +using grpc::TestSslChannel; +using grpc::testing::SimpleRequest; +using grpc::testing::SimpleResponse; +using grpc::testing::TestService; + +std::shared_ptr<ChannelInterface> CreateTestChannel( + const grpc::string& server) { + std::shared_ptr<ChannelInterface> channel; + if (FLAGS_enable_ssl) { + channel.reset(new TestSslChannel(server)); + } else { + channel = CreateChannel(server); + } + return channel; +} + +static double now() { + gpr_timespec tv = gpr_now(); + return 1e9 * tv.tv_sec + tv.tv_nsec; +} + +void RunTest(const int client_threads, const int client_channels, + const int num_rpcs, const int payload_size) { + gpr_log(GPR_INFO, + "QPS test with parameters\n" + "enable_ssl = %d\n" + "client_channels = %d\n" + "client_threads = %d\n" + "num_rpcs = %d\n" + "payload_size = %d\n" + "server_host:server_port = %s:%d\n\n", + FLAGS_enable_ssl, client_channels, client_threads, num_rpcs, + payload_size, FLAGS_server_host.c_str(), FLAGS_server_port); + + std::ostringstream oss; + oss << FLAGS_server_host << ":" << FLAGS_server_port; + + class ClientChannelInfo { + public: + explicit ClientChannelInfo(const grpc::string &server) + : channel_(CreateTestChannel(server)), + stub_(TestService::NewStub(channel_)) {} + ChannelInterface *get_channel() { return channel_.get(); } + TestService::Stub *get_stub() { return stub_.get(); } + + private: + std::shared_ptr<ChannelInterface> channel_; + std::unique_ptr<TestService::Stub> stub_; + }; + + std::vector<ClientChannelInfo> channels; + for (int i = 0; i < client_channels; i++) { + channels.push_back(ClientChannelInfo(oss.str())); + } + + std::vector<std::thread> threads; // Will add threads when ready to execute + std::vector<::gpr_histogram *> thread_stats(client_threads); + + for (int i = 0; i < client_threads; i++) { + gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); + GPR_ASSERT(hist != NULL); + thread_stats[i] = hist; + + threads.push_back(std::thread( + [hist, client_threads, client_channels, num_rpcs, payload_size, + &channels](int channel_num) { + SimpleRequest request; + SimpleResponse response; + request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + request.set_response_size(payload_size); + + for (int j = 0; j < num_rpcs; j++) { + TestService::Stub *stub = channels[channel_num].get_stub(); + double start = now(); + grpc::ClientContext context; + grpc::Status s = stub->UnaryCall(&context, request, &response); + gpr_histogram_add(hist, now() - start); + + GPR_ASSERT((s.code() == grpc::StatusCode::OK) && + (response.payload().type() == + grpc::testing::PayloadType::COMPRESSABLE) && + (response.payload().body().length() == + static_cast<size_t>(payload_size))); + + // Now do runtime round-robin assignment of the next channel number + channel_num += client_threads; + channel_num %= client_channels; + } + }, + i % client_channels)); + } + + gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); + GPR_ASSERT(hist != NULL); + for (auto& t : threads) { + t.join(); + } + for (int i = 0; i < client_threads; i++) { + gpr_histogram *h = thread_stats[i]; + gpr_log(GPR_INFO, "latency at thread %d (50/95/99/99.9): %f/%f/%f/%f", + i, + gpr_histogram_percentile(h, 50), + gpr_histogram_percentile(h, 95), + gpr_histogram_percentile(h, 99), + gpr_histogram_percentile(h, 99.9)); + gpr_histogram_merge(hist, h); + gpr_histogram_destroy(h); + } + + gpr_log( + GPR_INFO, + "latency across %d threads with %d channels and %d payload " + "(50/95/99/99.9): %f / %f / %f / %f", + client_threads, client_channels, payload_size, + gpr_histogram_percentile(hist, 50), gpr_histogram_percentile(hist, 95), + gpr_histogram_percentile(hist, 99), gpr_histogram_percentile(hist, 99.9)); + gpr_histogram_destroy(hist); +} + +int main(int argc, char **argv) { + grpc_init(); + google::ParseCommandLineFlags(&argc, &argv, true); + + GPR_ASSERT(FLAGS_server_port); + + if (FLAGS_workload.length() == 0) { + RunTest(FLAGS_client_threads, FLAGS_client_channels, FLAGS_num_rpcs, + FLAGS_payload_size); + } else { + std::istringstream workload(FLAGS_workload); + int client_threads, client_channels, num_rpcs, payload_size; + workload >> client_threads; + while (!workload.eof()) { + workload >> client_channels >> num_rpcs >> payload_size; + RunTest(client_threads, client_channels, num_rpcs, payload_size); + workload >> client_threads; + } + gpr_log(GPR_INFO, "Done with specified workload."); + } + + grpc_shutdown(); + return 0; +} diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc new file mode 100644 index 0000000000000000000000000000000000000000..7e6c01e24e6041b3850ff19687fa8159c12b70ab --- /dev/null +++ b/test/cpp/qps/server.cc @@ -0,0 +1,116 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <thread> + +#include <google/gflags.h> +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc++/config.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/status.h> +#include "test/cpp/interop/test.pb.h" + +#include <grpc/grpc.h> +#include <grpc/support/log.h> + +DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); +DEFINE_int32(port, 0, "Server port."); + +using grpc::Server; +using grpc::ServerBuilder; +using grpc::testing::Payload; +using grpc::testing::PayloadType; +using grpc::testing::SimpleRequest; +using grpc::testing::SimpleResponse; +using grpc::testing::TestService; +using grpc::Status; + +bool SetPayload(PayloadType type, int size, Payload* payload) { + PayloadType response_type = type; + // TODO(yangg): Support UNCOMPRESSABLE payload. + if (type != PayloadType::COMPRESSABLE) { + return false; + } + payload->set_type(response_type); + std::unique_ptr<char[]> body(new char[size]()); + payload->set_body(body.get(), size); + return true; +} + +class TestServiceImpl : public TestService::Service { + public: + Status UnaryCall(const SimpleRequest* request, SimpleResponse* response) { + if (request->has_response_size() && request->response_size() > 0) { + if (!SetPayload(request->response_type(), request->response_size(), + response->mutable_payload())) { + return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + } + } + return Status::OK; + } +}; + +void RunServer() { + char* server_address = NULL; + gpr_join_host_port(&server_address, "::", FLAGS_port); + + TestServiceImpl service; + + SimpleRequest request; + SimpleResponse response; + + ServerBuilder builder; + builder.AddPort(server_address); + builder.RegisterService(service.service()); + std::unique_ptr<Server> server(builder.BuildAndStart()); + gpr_log(GPR_INFO, "Server listening on %s\n", server_address); + while (true) { + std::this_thread::sleep_for(std::chrono::seconds(5)); + } + + gpr_free(server_address); +} + +int main(int argc, char** argv) { + grpc_init(); + google::ParseCommandLineFlags(&argc, &argv, true); + + GPR_ASSERT(FLAGS_port != 0); + GPR_ASSERT(!FLAGS_enable_ssl); + RunServer(); + + grpc_shutdown(); + return 0; +}