Skip to content
Snippets Groups Projects
Commit 37f72575 authored by vjpai's avatar vjpai
Browse files

WIP

parent 0426ad0f
No related branches found
No related tags found
No related merge requests found
......@@ -83,6 +83,7 @@ class Client {
protected:
SimpleRequest request_;
bool closed_loop_;
class ClientChannelInfo {
public:
......@@ -222,7 +223,6 @@ class Client {
std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<Timer> timer_;
bool closed_loop_;
InterarrivalTimer interarrival_timer_;
std::vector<std::chrono::time_point
<std::chrono::high_resolution_clock>> next_time_;
......
......@@ -55,6 +55,10 @@
namespace grpc {
namespace testing {
typedef std::chrono::high_resolution_clock grpc_time_source;
typedef std::chrono::time_point<grpc_time_source> grpc_time;
typedef std::forward_list<grpc_time> deadline_list;
class ClientRpcContext {
public:
ClientRpcContext() {}
......@@ -66,6 +70,12 @@ class ClientRpcContext {
static ClientRpcContext* detag(void* t) {
return reinterpret_cast<ClientRpcContext*>(t);
}
deadline_list::iterator deadline_posn() const {return deadline_posn_;}
void set_deadline_posn(deadline_list::iterator&& it) {deadline_posn_ = it;}
virtual void Start() = 0;
private:
deadline_list::iterator deadline_posn_;
};
template <class RequestType, class ResponseType>
......@@ -84,9 +94,11 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
response_(),
next_state_(&ClientRpcContextUnaryImpl::RespDone),
callback_(on_done),
start_req_(start_req),
start_(Timer::Now()),
response_reader_(start_req(stub_, &context_, req_)) {
start_req_(start_req) {
}
void Start() GRPC_OVERRIDE {
start_ = Timer::Now();
response_reader_.reset(start_req(stub_, &context_, req_));
response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
}
~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
......@@ -133,14 +145,32 @@ class AsyncClient : public Client {
Client(config) {
for (int i = 0; i < config.async_client_threads(); i++) {
cli_cqs_.emplace_back(new CompletionQueue);
if (!closed_loop_) {
rpc_deadlines_.emplace_back();
next_channel_.push_back(i % channel_count_);
issue_allowed_.push_back(true);
grpc_time next_issue;
NextIssueTime(i, &next_issue);
next_issue_.push_back(next_issue);
}
}
int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
if (!closed_loop_) {
for (auto channel = channels_.begin(); channel != channels_.end();
channel++) {
auto* cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size();
setup_ctx(cq, channel->get_stub(), request_);
channel_rpc_count_lock.emplace_back();
rpcs_outstanding_.push_back(0);
}
}
else {
int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (auto channel = channels_.begin(); channel != channels_.end();
channel++) {
auto* cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size();
setup_ctx(cq, channel->get_stub(), request_);
}
}
}
}
......@@ -159,26 +189,68 @@ class AsyncClient : public Client {
GRPC_OVERRIDE GRPC_FINAL {
void* got_tag;
bool ok;
switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok,
std::chrono::system_clock::now() +
std::chrono::seconds(1))) {
case CompletionQueue::SHUTDOWN: return false;
case CompletionQueue::TIMEOUT: return true;
case CompletionQueue::GOT_EVENT: break;
grpc_time deadline, short_deadline;
if (closed_loop_) {
deadline = grpc_time_source::now() + std::chrono::seconds(1);
short_deadline = deadline;
} else {
deadline = *(rpc_deadlines_[thread_idx].begin());
short_deadline = issue_allowed_[thread_idx] ?
next_issue_[thread_idx] : deadline;
}
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
if (ctx->RunNextState(ok, histogram) == false) {
// call the callback and then delete it
ctx->RunNextState(ok, histogram);
ctx->StartNewClone();
delete ctx;
switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) {
case CompletionQueue::SHUTDOWN: return false;
case CompletionQueue::TIMEOUT:
got_event = false;
break;
case CompletionQueue::GOT_EVENT:
got_event = true;
break;
}
return true;
if (grpc_time_source::now() > deadline) {
// we have missed some 1-second deadline, which is too much gpr_log(GPR_INFO, "Missed an RPC deadline, giving up");
return false;
}
if (got_event) {
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
if (ctx->RunNextState(ok, histogram) == false) {
// call the callback and then delete it
rpc_deadlines_[thread_idx].erase_after(ctx->deadline_posn());
ctx->RunNextState(ok, histogram);
ctx->StartNewClone();
delete ctx;
}
issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been
}
if (issue_allowed && grpc_time_source::now() >= next_issue_[thread_idx]) {
// Attempt to issue
bool issued = false;
for (int num_attempts = 0; num_attempts < channel_count_ && !issued;
num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) {
std::lock_guard g(channel_rpc_count_lock_[next_channel_[thread_idx]]);
if (rpcs_outstanding[next_channel_[thread_idx]] < max_outstanding_per_channel_) {
// do the work to issue
rpcs_outstanding[next_channel_[thread_idx]]++;
issued = true;
}
}
if (!issued)
issue_allowed = false;
}
return true;
}
private:
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
std::vector<deadline_list> rpc_deadlines_; // per thread deadlines
std::vector<int> next_channel_; // per thread round-robin channel ctr
std::vector<bool> issue_allowed_; // may this thread attempt to issue
std::vector<grpc_time> next_issue_; // when should it issue?
std::vector<std::mutex> channel_rpc_count_lock_;
std::vector<int> rpcs_outstanding_; // per-channel vector
int max_outstanding_per_channel_;
int channel_count_;
};
class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
......@@ -199,6 +271,7 @@ private:
new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
stub, req, start_req, check_done);
}
};
template <class RequestType, class ResponseType>
......@@ -227,7 +300,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
void StartNewClone() GRPC_OVERRIDE {
new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_);
}
void Start() GRPC_OVERRIDE {}
private:
bool ReqSent(bool ok, Histogram *) {
return StartWrite(ok);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment