From e4886680752e9181e0d848c3046e4a8d1eddffa3 Mon Sep 17 00:00:00 2001 From: Vijay Pai <vpai@google.com> Date: Wed, 30 Dec 2015 11:56:19 -0800 Subject: [PATCH] WIP --- test/cpp/qps/client.h | 121 +++++++++++++++++++---------------- test/cpp/qps/client_async.cc | 23 +++++-- test/cpp/qps/client_sync.cc | 7 +- 3 files changed, 86 insertions(+), 65 deletions(-) diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index b11a83b570..9a2894687d 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -37,6 +37,9 @@ #include <condition_variable> #include <mutex> +#include <grpc++/support/byte_buffer.h> +#include <grpc++/support/slice.h> + #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" @@ -74,7 +77,7 @@ void CreateRequest(RequestType *req, const PayloadConfig&) { // check since these only happen at the beginning anyway GPR_ASSERT(false); } - + template <> void CreateRequest<SimpleRequest>(SimpleRequest *req, const PayloadConfig& payload_config) { if (payload_config.has_bytebuf_params()) { @@ -96,11 +99,15 @@ void CreateRequest<SimpleRequest>(SimpleRequest *req, const PayloadConfig& paylo } } template <> -void CreateRequest<ByteBuffer>(ByteBuffer *req, const PayloadConfig& payload_config) { +void CreateRequest<ByteBuffer>(ByteBuffer *req, + const PayloadConfig& payload_config) { if (payload_config.has_bytebuf_params()) { - if (payload_config.req_size() > 0) { - std::unique_ptr<char> buf(new char[payload_config.req_size()]); - gpr_slice_from_copied_buffer(buf.get(), payload_config.req_size()); + if (payload_config.bytebuf_params().req_size() > 0) { + std::unique_ptr<char> + buf(new char[payload_config.bytebuf_params().req_size()]); + gpr_slice s = + gpr_slice_from_copied_buffer(buf.get(), + payload_config.bytebuf_params().req_size()); Slice slice(s, Slice::STEAL_REF); std::unique_ptr<ByteBuffer> bbuf(new ByteBuffer(&slice, 1)); req->MoveFrom(bbuf.get()); @@ -110,24 +117,11 @@ void CreateRequest<ByteBuffer>(ByteBuffer *req, const PayloadConfig& payload_con } } } - -template <class StubType, class RequestType> + class Client { public: - Client(const ClientConfig& config, - std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub) - : channels_(config.client_channels()), - create_stub_(create_stub), - timer_(new Timer), - interarrival_timer_() { - for (int i = 0; i < config.client_channels(); i++) { - channels_[i].init(config.server_targets(i % config.server_targets_size()), - config); - } - - ClientRequestCreation::CreateRequest<RequestType>(&request_, config.payload_config()); - } - virtual ~Client() {} + Client() : timer_(new Timer), interarrival_timer_() {} + virtual ~Client(); ClientStats Mark(bool reset) { Histogram latencies; @@ -162,40 +156,9 @@ class Client { stats.set_time_user(timer_result.user); return stats; } - protected: - RequestType request_; bool closed_loop_; - class ClientChannelInfo { - public: - ClientChannelInfo() {} - ClientChannelInfo(const ClientChannelInfo& i) { - // The copy constructor is to satisfy old compilers - // that need it for using std::vector . It is only ever - // used for empty entries - GPR_ASSERT(!i.channel_ && !i.stub_); - } - void init(const grpc::string& target, const ClientConfig& config) { - // We have to use a 2-phase init like this with a default - // constructor followed by an initializer function to make - // old compilers happy with using this in std::vector - channel_ = CreateTestChannel( - target, config.security_params().server_host_override(), - config.has_security_params(), - !config.security_params().use_test_ca()); - stub_ = create_stub_(channel_); - } - Channel* get_channel() { return channel_.get(); } - StubType* get_stub() { return stub_.get(); } - - private: - std::shared_ptr<Channel> channel_; - std::unique_ptr<StubType> stub_; - }; - std::vector<ClientChannelInfo> channels_; - std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub_; - void StartThreads(size_t num_threads) { for (size_t i = 0; i < num_threads; i++) { threads_.emplace_back(new Thread(this, i)); @@ -264,7 +227,6 @@ class Client { return true; } } - private: class Thread { public: @@ -326,8 +288,6 @@ class Client { } } - BenchmarkService::Stub* stub_; - ClientConfig config_; std::mutex mu_; std::condition_variable cv_; bool done_; @@ -337,7 +297,7 @@ class Client { size_t idx_; std::thread impl_; }; - + std::vector<std::unique_ptr<Thread>> threads_; std::unique_ptr<Timer> timer_; @@ -345,6 +305,55 @@ class Client { std::vector<grpc_time> next_time_; }; +template <class StubType, class RequestType> +class ClientImpl : public Client { + public: + ClientImpl(const ClientConfig& config, + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub) + : channels_(config.client_channels()), + create_stub_(create_stub) { + for (int i = 0; i < config.client_channels(); i++) { + channels_[i].init(config.server_targets(i % config.server_targets_size()), + config); + } + + ClientRequestCreation::CreateRequest<RequestType>(&request_, config.payload_config()); + } + virtual ~ClientImpl() {} + + protected: + RequestType request_; + + class ClientChannelInfo { + public: + ClientChannelInfo() {} + ClientChannelInfo(const ClientChannelInfo& i) { + // The copy constructor is to satisfy old compilers + // that need it for using std::vector . It is only ever + // used for empty entries + GPR_ASSERT(!i.channel_ && !i.stub_); + } + void init(const grpc::string& target, const ClientConfig& config) { + // We have to use a 2-phase init like this with a default + // constructor followed by an initializer function to make + // old compilers happy with using this in std::vector + channel_ = CreateTestChannel( + target, config.security_params().server_host_override(), + config.has_security_params(), + !config.security_params().use_test_ca()); + stub_ = create_stub_(channel_); + } + Channel* get_channel() { return channel_.get(); } + StubType* get_stub() { return stub_.get(); } + + private: + std::shared_ptr<Channel> channel_; + std::unique_ptr<StubType> stub_; + }; + std::vector<ClientChannelInfo> channels_; + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub_; +}; + std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args); std::unique_ptr<Client> CreateSynchronousStreamingClient( const ClientConfig& args); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index c05774c410..fdfe1a567a 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -47,6 +47,7 @@ #include <grpc/support/log.h> #include <gflags/gflags.h> #include <grpc++/client_context.h> +#include <grpc++/generic/generic_stub.h> #include "test/cpp/qps/timer.h" #include "test/cpp/qps/client.h" @@ -148,13 +149,20 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { typedef std::forward_list<ClientRpcContext*> context_list; template <class StubType, class RequestType> -class AsyncClient : public Client<StubType, RequestType> { +class AsyncClient : public ClientImpl<StubType, RequestType> { + // Specify which protected members we are using since there is no + // member name resolution until the template types are fully resolved public: + using Client::SetupLoadTest; + using Client::NextIssueTime; + using Client::closed_loop_; + using ClientImpl<StubType,RequestType>::channels_; + using ClientImpl<StubType,RequestType>::request_; AsyncClient( const ClientConfig& config, std::function<ClientRpcContext*(int, StubType*, const RequestType&)> setup_ctx, std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub) - : Client(config, create_stub), + : ClientImpl<StubType,RequestType>(config, create_stub), channel_lock_(new std::mutex[config.client_channels()]), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), @@ -344,7 +352,8 @@ class AsyncClient : public Client<StubType, RequestType> { int pref_channel_inc_; }; -class AsyncUnaryClient GRPC_FINAL : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { +class AsyncUnaryClient GRPC_FINAL : + public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public: explicit AsyncUnaryClient(const ClientConfig& config) : AsyncClient(config, SetupCtx, BenchmarkService::NewStub) { @@ -559,10 +568,10 @@ class GenericAsyncStreamingClient GRPC_FINAL : public AsyncClient<grpc::GenericS }; static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub, - const SimpleRequest& req) { - return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - channel_id, stub, req, AsyncStreamingClient::StartReq, - AsyncStreamingClient::CheckDone); + const ByteBuffer& req) { + return new ClientGenericRpcContextStreamingImpl( + channel_id, stub, req, GenericAsyncStreamingClient::StartReq, + GenericAsyncStreamingClient::CheckDone); } }; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 10d680860a..409fc26972 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -64,9 +64,12 @@ namespace grpc { namespace testing { -class SynchronousClient : public Client { +class SynchronousClient : + public ClientImpl<BenchmarkService::Stub, SimpleRequest> { public: - SynchronousClient(const ClientConfig& config) : Client(config) { + SynchronousClient(const ClientConfig& config) : + ClientImpl<BenchmarkService::Stub, + SimpleRequest>(config, BenchmarkService::NewStub) { num_threads_ = config.outstanding_rpcs_per_channel() * config.client_channels(); responses_.resize(num_threads_); -- GitLab