Skip to content
Snippets Groups Projects
Commit ef638390 authored by Craig Tiller's avatar Craig Tiller
Browse files

Async client works

parent d1e18fae
No related branches found
No related tags found
No related merge requests found
...@@ -60,6 +60,7 @@ class ClientRpcContext { ...@@ -60,6 +60,7 @@ class ClientRpcContext {
ClientRpcContext() {} ClientRpcContext() {}
virtual ~ClientRpcContext() {} virtual ~ClientRpcContext() {}
virtual bool RunNextState() = 0; // do next state, return false if steps done virtual bool RunNextState() = 0; // do next state, return false if steps done
virtual void StartNewClone() = 0;
static void *tag(ClientRpcContext *c) { return reinterpret_cast<void *>(c); } static void *tag(ClientRpcContext *c) { return reinterpret_cast<void *>(c); }
static ClientRpcContext *detag(void *t) { static ClientRpcContext *detag(void *t) {
return reinterpret_cast<ClientRpcContext *>(t); return reinterpret_cast<ClientRpcContext *>(t);
...@@ -83,7 +84,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { ...@@ -83,7 +84,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
req_(req), req_(req),
response_(), response_(),
next_state_(&ClientRpcContextUnaryImpl::ReqSent), next_state_(&ClientRpcContextUnaryImpl::ReqSent),
callback_(on_done), callback_(on_done), start_req_(start_req),
start_(Timer::Now()), start_(Timer::Now()),
response_reader_( response_reader_(
start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {}
...@@ -93,6 +94,10 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { ...@@ -93,6 +94,10 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
hist->Add((Timer::Now() - start_) * 1e9); hist->Add((Timer::Now() - start_) * 1e9);
} }
void StartNewClone() {
new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_);
}
private: private:
bool ReqSent() { bool ReqSent() {
next_state_ = &ClientRpcContextUnaryImpl::RespDone; next_state_ = &ClientRpcContextUnaryImpl::RespDone;
...@@ -113,6 +118,10 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { ...@@ -113,6 +118,10 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
ResponseType response_; ResponseType response_;
bool (ClientRpcContextUnaryImpl::*next_state_)(); bool (ClientRpcContextUnaryImpl::*next_state_)();
std::function<void(grpc::Status, ResponseType *)> callback_; std::function<void(grpc::Status, ResponseType *)> callback_;
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub *, grpc::ClientContext *, const RequestType &,
void *)> start_req_;
grpc::Status status_; grpc::Status status_;
double start_; double start_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
...@@ -152,6 +161,19 @@ class AsyncClient GRPC_FINAL : public Client { ...@@ -152,6 +161,19 @@ class AsyncClient GRPC_FINAL : public Client {
StartThreads(config.async_client_threads()); StartThreads(config.async_client_threads());
} }
~AsyncClient() GRPC_OVERRIDE {
EndThreads();
for (auto& cq : cli_cqs_) {
cq->Shutdown();
void *got_tag;
bool ok;
while (cq->Next(&got_tag, &ok)) {
delete ClientRpcContext::detag(got_tag);
}
}
}
void ThreadFunc(Histogram *histogram, size_t thread_idx) { void ThreadFunc(Histogram *histogram, size_t thread_idx) {
void *got_tag; void *got_tag;
bool ok; bool ok;
...@@ -162,6 +184,7 @@ class AsyncClient GRPC_FINAL : public Client { ...@@ -162,6 +184,7 @@ class AsyncClient GRPC_FINAL : public Client {
// call the callback and then delete it // call the callback and then delete it
ctx->report_stats(histogram); ctx->report_stats(histogram);
ctx->RunNextState(); ctx->RunNextState();
ctx->StartNewClone();
delete ctx; delete ctx;
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment