diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 45481a3918082474471267b10da916442cf88d08..620103b77dec87b662b8bc286230fde2b9572acc 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -83,6 +83,7 @@ class Client { protected: SimpleRequest request_; + bool closed_loop_; class ClientChannelInfo { public: @@ -222,7 +223,6 @@ class Client { std::vector<std::unique_ptr<Thread>> threads_; std::unique_ptr<Timer> timer_; - bool closed_loop_; InterarrivalTimer interarrival_timer_; std::vector<std::chrono::time_point <std::chrono::high_resolution_clock>> next_time_; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index e3ab57728daeac9ca9df00b949a6a04635641a44..fa1a799f1b53442407970afd22cc8b5af08e1ec1 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -55,6 +55,10 @@ namespace grpc { namespace testing { +typedef std::chrono::high_resolution_clock grpc_time_source; +typedef std::chrono::time_point<grpc_time_source> grpc_time; +typedef std::forward_list<grpc_time> deadline_list; + class ClientRpcContext { public: ClientRpcContext() {} @@ -66,6 +70,12 @@ class ClientRpcContext { static ClientRpcContext* detag(void* t) { return reinterpret_cast<ClientRpcContext*>(t); } + + deadline_list::iterator deadline_posn() const {return deadline_posn_;} + void set_deadline_posn(deadline_list::iterator&& it) {deadline_posn_ = it;} + virtual void Start() = 0; + private: + deadline_list::iterator deadline_posn_; }; template <class RequestType, class ResponseType> @@ -84,9 +94,11 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { response_(), next_state_(&ClientRpcContextUnaryImpl::RespDone), callback_(on_done), - start_req_(start_req), - start_(Timer::Now()), - response_reader_(start_req(stub_, &context_, req_)) { + start_req_(start_req) { + } + void Start() GRPC_OVERRIDE { + start_ = Timer::Now(); + response_reader_.reset(start_req(stub_, &context_, req_)); response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); } ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} @@ -133,14 +145,32 @@ class AsyncClient : public Client { Client(config) { for (int i = 0; i < config.async_client_threads(); i++) { cli_cqs_.emplace_back(new CompletionQueue); + if (!closed_loop_) { + rpc_deadlines_.emplace_back(); + next_channel_.push_back(i % channel_count_); + issue_allowed_.push_back(true); + + grpc_time next_issue; + NextIssueTime(i, &next_issue); + next_issue_.push_back(next_issue); + } } - int t = 0; - for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { + if (!closed_loop_) { for (auto channel = channels_.begin(); channel != channels_.end(); channel++) { - auto* cq = cli_cqs_[t].get(); - t = (t + 1) % cli_cqs_.size(); - setup_ctx(cq, channel->get_stub(), request_); + channel_rpc_count_lock.emplace_back(); + rpcs_outstanding_.push_back(0); + } + } + else { + int t = 0; + for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { + for (auto channel = channels_.begin(); channel != channels_.end(); + channel++) { + auto* cq = cli_cqs_[t].get(); + t = (t + 1) % cli_cqs_.size(); + setup_ctx(cq, channel->get_stub(), request_); + } } } } @@ -159,26 +189,68 @@ class AsyncClient : public Client { GRPC_OVERRIDE GRPC_FINAL { void* got_tag; bool ok; - switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, - std::chrono::system_clock::now() + - std::chrono::seconds(1))) { - case CompletionQueue::SHUTDOWN: return false; - case CompletionQueue::TIMEOUT: return true; - case CompletionQueue::GOT_EVENT: break; + grpc_time deadline, short_deadline; + if (closed_loop_) { + deadline = grpc_time_source::now() + std::chrono::seconds(1); + short_deadline = deadline; + } else { + deadline = *(rpc_deadlines_[thread_idx].begin()); + short_deadline = issue_allowed_[thread_idx] ? + next_issue_[thread_idx] : deadline; } - - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState(ok, histogram) == false) { - // call the callback and then delete it - ctx->RunNextState(ok, histogram); - ctx->StartNewClone(); - delete ctx; + switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) { + case CompletionQueue::SHUTDOWN: return false; + case CompletionQueue::TIMEOUT: + got_event = false; + break; + case CompletionQueue::GOT_EVENT: + got_event = true; + break; } - - return true; + if (grpc_time_source::now() > deadline) { + // we have missed some 1-second deadline, which is too much gpr_log(GPR_INFO, "Missed an RPC deadline, giving up"); + return false; + } + if (got_event) { + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + if (ctx->RunNextState(ok, histogram) == false) { + // call the callback and then delete it + rpc_deadlines_[thread_idx].erase_after(ctx->deadline_posn()); + ctx->RunNextState(ok, histogram); + ctx->StartNewClone(); + delete ctx; + } + issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been + } + if (issue_allowed && grpc_time_source::now() >= next_issue_[thread_idx]) { + // Attempt to issue + bool issued = false; + for (int num_attempts = 0; num_attempts < channel_count_ && !issued; + num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) { + std::lock_guard g(channel_rpc_count_lock_[next_channel_[thread_idx]]); + if (rpcs_outstanding[next_channel_[thread_idx]] < max_outstanding_per_channel_) { + // do the work to issue + rpcs_outstanding[next_channel_[thread_idx]]++; + issued = true; + } + } + if (!issued) + issue_allowed = false; + } + return true; } private: std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; + + std::vector<deadline_list> rpc_deadlines_; // per thread deadlines + std::vector<int> next_channel_; // per thread round-robin channel ctr + std::vector<bool> issue_allowed_; // may this thread attempt to issue + std::vector<grpc_time> next_issue_; // when should it issue? + + std::vector<std::mutex> channel_rpc_count_lock_; + std::vector<int> rpcs_outstanding_; // per-channel vector + int max_outstanding_per_channel_; + int channel_count_; }; class AsyncUnaryClient GRPC_FINAL : public AsyncClient { @@ -199,6 +271,7 @@ private: new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( stub, req, start_req, check_done); } + }; template <class RequestType, class ResponseType> @@ -227,7 +300,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { void StartNewClone() GRPC_OVERRIDE { new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_); } - + void Start() GRPC_OVERRIDE {} private: bool ReqSent(bool ok, Histogram *) { return StartWrite(ok);