Skip to content
Snippets Groups Projects
Commit 7b172b24 authored by Vijay Pai's avatar Vijay Pai
Browse files

Get the code to stop crashing by fixing bugs

parent dc0615fa
No related branches found
No related tags found
No related merge requests found
......@@ -34,6 +34,7 @@
#include <cassert>
#include <forward_list>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <string>
......@@ -57,8 +58,8 @@
namespace grpc {
namespace testing {
typedef std::forward_list<grpc_time> deadline_list;
typedef std::list<grpc_time> deadline_list;
class ClientRpcContext {
public:
ClientRpcContext(int ch): channel_id_(ch) {}
......@@ -72,8 +73,8 @@ class ClientRpcContext {
}
deadline_list::iterator deadline_posn() const {return deadline_posn_;}
void set_deadline_posn(deadline_list::iterator&& it) {deadline_posn_ = it;}
virtual void Start() = 0;
void set_deadline_posn(const deadline_list::iterator& it) {deadline_posn_ = it;}
virtual void Start(CompletionQueue *cq) = 0;
int channel_id() const {return channel_id_;}
protected:
int channel_id_;
......@@ -88,7 +89,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
TestService::Stub* stub, const RequestType& req,
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub*, grpc::ClientContext*, const RequestType&)>
TestService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)>
start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id), context_(),
......@@ -99,9 +101,9 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
callback_(on_done),
start_req_(start_req) {
}
void Start() GRPC_OVERRIDE {
void Start(CompletionQueue *cq) GRPC_OVERRIDE {
start_ = Timer::Now();
response_reader_ = start_req_(stub_, &context_, req_);
response_reader_ = start_req_(stub_, &context_, req_, cq);
response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
}
~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
......@@ -115,7 +117,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
return new ClientRpcContextUnaryImpl(channel_id_,
stub_, req_, start_req_, callback_);
stub_, req_, start_req_, callback_);
}
private:
......@@ -123,9 +125,9 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
next_state_ = &ClientRpcContextUnaryImpl::DoCallBack;
return false;
}
bool DoCallBack(bool) {
bool DoCallBack (bool) {
callback_(status_, &response_);
return false;
return true; // we're done, this'll be ignored
}
grpc::ClientContext context_;
TestService::Stub* stub_;
......@@ -134,7 +136,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub*, grpc::ClientContext*, const RequestType&)> start_req_;
TestService::Stub*, grpc::ClientContext*,
const RequestType&, CompletionQueue *)> start_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
......@@ -146,11 +149,11 @@ typedef std::forward_list<ClientRpcContext *> context_list;
class AsyncClient : public Client {
public:
explicit AsyncClient(const ClientConfig& config,
std::function<ClientRpcContext*(int, CompletionQueue*, TestService::Stub*,
std::function<ClientRpcContext*(int, TestService::Stub*,
const SimpleRequest&)> setup_ctx) :
Client(config), channel_lock_(config.client_channels()),
max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
contexts_(config.client_channels()),
max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
channel_count_(config.client_channels()) {
SetupLoadTest(config, config.async_client_threads());
......@@ -169,8 +172,8 @@ class AsyncClient : public Client {
}
if (!closed_loop_) {
for (auto channel = channels_.begin(); channel != channels_.end();
channel++) {
rpcs_outstanding_.push_back(0);
channel++) {
rpcs_outstanding_.push_back(0);
}
}
......@@ -178,14 +181,12 @@ class AsyncClient : public Client {
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (int ch = 0; ch < channel_count_; ch++) {
auto& channel = channels_[ch];
auto* cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size();
auto ctx = setup_ctx(ch, cq, channel.get_stub(), request_);
if (closed_loop_) {
// only relevant for closed_loop unary, but harmless for
// closed_loop streaming
ctx->Start();
}
auto* cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size();
auto ctx = setup_ctx(ch, channel.get_stub(), request_);
if (closed_loop_) {
ctx->Start(cq);
}
else {
contexts_[ch].push_front(ctx);
}
......@@ -238,28 +239,36 @@ class AsyncClient : public Client {
}
if ((closed_loop_ || !rpc_deadlines_[thread_idx].empty()) &&
grpc_time_source::now() > deadline) {
// we have missed some 1-second deadline, which is too much gpr_log(GPR_INFO, "Missed an RPC deadline, giving up");
// we have missed some 1-second deadline, which is too much
gpr_log(GPR_INFO, "Missed an RPC deadline, giving up");
return false;
}
if (got_event) {
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
if (ctx->RunNextState(ok, histogram) == false) {
// call the callback and then delete it
rpc_deadlines_[thread_idx].erase_after(ctx->deadline_posn());
// call the callback and then clone the ctx
ctx->RunNextState(ok, histogram);
ClientRpcContext *clone_ctx = ctx->StartNewClone();
delete ctx;
if (!closed_loop_) {
// Put this in the list of idle contexts for this channel
if (closed_loop_) {
clone_ctx->Start(cli_cqs_[thread_idx].get());
}
else {
// Remove the entry from the rpc deadlines list
rpc_deadlines_[thread_idx].erase(ctx->deadline_posn());
// Put the clone_ctx in the list of idle contexts for this channel
// Under lock
int ch = clone_ctx->channel_id();
std::lock_guard<std::mutex> g(channel_lock_[ch]);
contexts_[ch].push_front(ctx);
rpcs_outstanding_[ch]--;
contexts_[ch].push_front(clone_ctx);
}
// delete the old version
delete ctx;
}
issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been
if (!closed_loop_)
issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been
}
if (issue_allowed_[thread_idx] &&
if (!closed_loop_ && issue_allowed_[thread_idx] &&
grpc_time_source::now() >= next_issue_[thread_idx]) {
// Attempt to issue
bool issued = false;
......@@ -273,16 +282,27 @@ class AsyncClient : public Client {
max_outstanding_per_channel_) &&
!contexts_[next_channel_[thread_idx]].empty()) {
// Get an idle context from the front of the list
auto ctx = contexts_[next_channel_[thread_idx]].begin();
auto ctx = *(contexts_[next_channel_[thread_idx]].begin());
contexts_[next_channel_[thread_idx]].pop_front();
// do the work to issue
(*ctx)->Start();
rpc_deadlines_[thread_idx].emplace_back(
grpc_time_source::now() + std::chrono::seconds(1));
auto it = rpc_deadlines_[thread_idx].end();
--it;
ctx->set_deadline_posn(it);
ctx->Start(cli_cqs_[thread_idx].get());
rpcs_outstanding_[next_channel_[thread_idx]]++;
issued = true;
}
}
if (!issued)
if (issued) {
grpc_time next_issue;
NextIssueTime(thread_idx, &next_issue);
next_issue_[thread_idx]=next_issue;
}
else {
issue_allowed_[thread_idx] = false;
}
}
return true;
}
......@@ -311,12 +331,11 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
private:
static ClientRpcContext *SetupCtx(int channel_id,
CompletionQueue* cq,
TestService::Stub* stub,
const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request) {
auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, CompletionQueue* cq) {
return stub->AsyncUnaryCall(ctx, request, cq);
};
return new ClientRpcContextUnaryImpl<SimpleRequest,
......@@ -333,7 +352,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
TestService::Stub* stub, const RequestType& req,
std::function<std::unique_ptr<
grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
TestService::Stub*, grpc::ClientContext*, void*)> start_req,
TestService::Stub*, grpc::ClientContext*, CompletionQueue*,
void*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id),
context_(),
......@@ -343,8 +363,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
next_state_(&ClientRpcContextStreamingImpl::ReqSent),
callback_(on_done),
start_req_(start_req),
start_(Timer::Now()),
stream_(start_req_(stub_, &context_, ClientRpcContext::tag(this))) {}
start_(Timer::Now()) {}
~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
return (this->*next_state_)(ok, hist);
......@@ -353,7 +372,9 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return new ClientRpcContextStreamingImpl(channel_id_,
stub_, req_, start_req_, callback_);
}
void Start() GRPC_OVERRIDE {}
void Start(CompletionQueue *cq) GRPC_OVERRIDE {
stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
}
private:
bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
bool StartWrite(bool ok) {
......@@ -385,7 +406,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
TestService::Stub*, grpc::ClientContext*, void*)> start_req_;
TestService::Stub*, grpc::ClientContext*,
CompletionQueue *, void*)> start_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
......@@ -396,17 +418,20 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
public:
explicit AsyncStreamingClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx) {
// async streaming currently only supported closed loop
GPR_ASSERT(config.load_type() == CLOSED_LOOP);
StartThreads(config.async_client_threads());
}
~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
private:
static ClientRpcContext *SetupCtx(int channel_id,
CompletionQueue* cq, TestService::Stub* stub,
const SimpleRequest& req) {
TestService::Stub* stub,
const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
void* tag) {
auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx,
CompletionQueue *cq, void* tag) {
auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream;
};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment