Skip to content
Snippets Groups Projects
Commit 5c8737d1 authored by Craig Tiller's avatar Craig Tiller
Browse files

clang-format qps code - it was getting out of hand

parent 76e0fb1d
No related branches found
No related tags found
No related merge requests found
...@@ -113,24 +113,26 @@ class Client { ...@@ -113,24 +113,26 @@ class Client {
: done_(false), : done_(false),
new_(nullptr), new_(nullptr),
impl_([this, idx, client]() { impl_([this, idx, client]() {
for (;;) { for (;;) {
// run the loop body // run the loop body
bool thread_still_ok = client->ThreadFunc(&histogram_, idx); bool thread_still_ok = client->ThreadFunc(&histogram_, idx);
// lock, see if we're done // lock, see if we're done
std::lock_guard<std::mutex> g(mu_); std::lock_guard<std::mutex> g(mu_);
if (!thread_still_ok) { if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true; done_ = true;
}
if (done_) {return;}
// check if we're marking, swap out the histogram if so
if (new_) {
new_->Swap(&histogram_);
new_ = nullptr;
cv_.notify_one();
}
} }
}) {} if (done_) {
return;
}
// check if we're marking, swap out the histogram if so
if (new_) {
new_->Swap(&histogram_);
new_ = nullptr;
cv_.notify_one();
}
}
}) {}
~Thread() { ~Thread() {
{ {
...@@ -168,10 +170,9 @@ class Client { ...@@ -168,10 +170,9 @@ class Client {
std::unique_ptr<Timer> timer_; std::unique_ptr<Timer> timer_;
}; };
std::unique_ptr<Client> std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
CreateSynchronousUnaryClient(const ClientConfig& args); std::unique_ptr<Client> CreateSynchronousStreamingClient(
std::unique_ptr<Client> const ClientConfig& args);
CreateSynchronousStreamingClient(const ClientConfig& args);
std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args); std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args);
std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args); std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args);
......
...@@ -128,16 +128,16 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { ...@@ -128,16 +128,16 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
class AsyncClient : public Client { class AsyncClient : public Client {
public: public:
explicit AsyncClient(const ClientConfig& config, explicit AsyncClient(const ClientConfig& config,
std::function<void(CompletionQueue*, TestService::Stub*, std::function<void(CompletionQueue*, TestService::Stub*,
const SimpleRequest&)> setup_ctx) : const SimpleRequest&)> setup_ctx)
Client(config) { : Client(config) {
for (int i = 0; i < config.async_client_threads(); i++) { for (int i = 0; i < config.async_client_threads(); i++) {
cli_cqs_.emplace_back(new CompletionQueue); cli_cqs_.emplace_back(new CompletionQueue);
} }
int t = 0; int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (auto channel = channels_.begin(); channel != channels_.end(); for (auto channel = channels_.begin(); channel != channels_.end();
channel++) { channel++) {
auto* cq = cli_cqs_[t].get(); auto* cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size(); t = (t + 1) % cli_cqs_.size();
setup_ctx(cq, channel->get_stub(), request_); setup_ctx(cq, channel->get_stub(), request_);
...@@ -155,16 +155,19 @@ class AsyncClient : public Client { ...@@ -155,16 +155,19 @@ class AsyncClient : public Client {
} }
} }
bool ThreadFunc(Histogram* histogram, size_t thread_idx) bool ThreadFunc(Histogram* histogram,
GRPC_OVERRIDE GRPC_FINAL { size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
void* got_tag; void* got_tag;
bool ok; bool ok;
switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, switch (cli_cqs_[thread_idx]->AsyncNext(
std::chrono::system_clock::now() + &got_tag, &ok,
std::chrono::seconds(1))) { std::chrono::system_clock::now() + std::chrono::seconds(1))) {
case CompletionQueue::SHUTDOWN: return false; case CompletionQueue::SHUTDOWN:
case CompletionQueue::TIMEOUT: return true; return false;
case CompletionQueue::GOT_EVENT: break; case CompletionQueue::TIMEOUT:
return true;
case CompletionQueue::GOT_EVENT:
break;
} }
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
...@@ -177,18 +180,20 @@ class AsyncClient : public Client { ...@@ -177,18 +180,20 @@ class AsyncClient : public Client {
return true; return true;
} }
private: private:
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
}; };
class AsyncUnaryClient GRPC_FINAL : public AsyncClient { class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
public: public:
explicit AsyncUnaryClient(const ClientConfig& config) : explicit AsyncUnaryClient(const ClientConfig& config)
AsyncClient(config, SetupCtx) { : AsyncClient(config, SetupCtx) {
StartThreads(config.async_client_threads()); StartThreads(config.async_client_threads());
} }
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
private:
private:
static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
const SimpleRequest& req) { const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {}; auto check_done = [](grpc::Status s, SimpleResponse* response) {};
...@@ -205,12 +210,11 @@ template <class RequestType, class ResponseType> ...@@ -205,12 +210,11 @@ template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext { class ClientRpcContextStreamingImpl : public ClientRpcContext {
public: public:
ClientRpcContextStreamingImpl( ClientRpcContextStreamingImpl(
TestService::Stub *stub, const RequestType &req, TestService::Stub* stub, const RequestType& req,
std::function< std::function<std::unique_ptr<
std::unique_ptr<grpc::ClientAsyncReaderWriter< grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
RequestType,ResponseType>>( TestService::Stub*, grpc::ClientContext*, void*)> start_req,
TestService::Stub *, grpc::ClientContext *, void *)> start_req, std::function<void(grpc::Status, ResponseType*)> on_done)
std::function<void(grpc::Status, ResponseType *)> on_done)
: context_(), : context_(),
stub_(stub), stub_(stub),
req_(req), req_(req),
...@@ -221,7 +225,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { ...@@ -221,7 +225,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
start_(Timer::Now()), start_(Timer::Now()),
stream_(start_req_(stub_, &context_, ClientRpcContext::tag(this))) {} stream_(start_req_(stub_, &context_, ClientRpcContext::tag(this))) {}
~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {} ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
bool RunNextState(bool ok, Histogram *hist) GRPC_OVERRIDE { bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
return (this->*next_state_)(ok, hist); return (this->*next_state_)(ok, hist);
} }
void StartNewClone() GRPC_OVERRIDE { void StartNewClone() GRPC_OVERRIDE {
...@@ -229,59 +233,58 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { ...@@ -229,59 +233,58 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
} }
private: private:
bool ReqSent(bool ok, Histogram *) { bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
return StartWrite(ok);
}
bool StartWrite(bool ok) { bool StartWrite(bool ok) {
if (!ok) { if (!ok) {
return(false); return (false);
} }
start_ = Timer::Now(); start_ = Timer::Now();
next_state_ = &ClientRpcContextStreamingImpl::WriteDone; next_state_ = &ClientRpcContextStreamingImpl::WriteDone;
stream_->Write(req_, ClientRpcContext::tag(this)); stream_->Write(req_, ClientRpcContext::tag(this));
return true; return true;
} }
bool WriteDone(bool ok, Histogram *) { bool WriteDone(bool ok, Histogram*) {
if (!ok) { if (!ok) {
return(false); return (false);
} }
next_state_ = &ClientRpcContextStreamingImpl::ReadDone; next_state_ = &ClientRpcContextStreamingImpl::ReadDone;
stream_->Read(&response_, ClientRpcContext::tag(this)); stream_->Read(&response_, ClientRpcContext::tag(this));
return true; return true;
} }
bool ReadDone(bool ok, Histogram *hist) { bool ReadDone(bool ok, Histogram* hist) {
hist->Add((Timer::Now() - start_) * 1e9); hist->Add((Timer::Now() - start_) * 1e9);
return StartWrite(ok); return StartWrite(ok);
} }
grpc::ClientContext context_; grpc::ClientContext context_;
TestService::Stub *stub_; TestService::Stub* stub_;
RequestType req_; RequestType req_;
ResponseType response_; ResponseType response_;
bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram *); bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
std::function<void(grpc::Status, ResponseType *)> callback_; std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter< std::function<
RequestType,ResponseType>>( std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
TestService::Stub *, grpc::ClientContext *, void *)> start_req_; TestService::Stub*, grpc::ClientContext*, void*)> start_req_;
grpc::Status status_; grpc::Status status_;
double start_; double start_;
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType,ResponseType>> std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
stream_; stream_;
}; };
class AsyncStreamingClient GRPC_FINAL : public AsyncClient { class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
public: public:
explicit AsyncStreamingClient(const ClientConfig &config) : explicit AsyncStreamingClient(const ClientConfig& config)
AsyncClient(config, SetupCtx) { : AsyncClient(config, SetupCtx) {
StartThreads(config.async_client_threads()); StartThreads(config.async_client_threads());
} }
~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
private:
private:
static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
const SimpleRequest& req) { const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {}; auto check_done = [](grpc::Status s, SimpleResponse* response) {};
auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
void *tag) { void* tag) {
auto stream = stub->AsyncStreamingCall(ctx, cq, tag); auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream; return stream;
}; };
......
...@@ -99,7 +99,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { ...@@ -99,7 +99,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
public: public:
SynchronousStreamingClient(const ClientConfig& config) SynchronousStreamingClient(const ClientConfig& config)
: SynchronousClient(config), context_(num_threads_), stream_(num_threads_) { : SynchronousClient(config),
context_(num_threads_),
stream_(num_threads_) {
for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub(); auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
...@@ -110,8 +112,8 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { ...@@ -110,8 +112,8 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
EndThreads(); EndThreads();
for (auto stream = stream_.begin(); stream != stream_.end(); stream++) { for (auto stream = stream_.begin(); stream != stream_.end(); stream++) {
if (*stream) { if (*stream) {
(*stream)->WritesDone(); (*stream)->WritesDone();
EXPECT_TRUE((*stream)->Finish().IsOk()); EXPECT_TRUE((*stream)->Finish().IsOk());
} }
} }
} }
...@@ -119,7 +121,7 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { ...@@ -119,7 +121,7 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
double start = Timer::Now(); double start = Timer::Now();
if (stream_[thread_idx]->Write(request_) && if (stream_[thread_idx]->Write(request_) &&
stream_[thread_idx]->Read(&responses_[thread_idx])) { stream_[thread_idx]->Read(&responses_[thread_idx])) {
histogram->Add((Timer::Now() - start) * 1e9); histogram->Add((Timer::Now() - start) * 1e9);
return true; return true;
} }
...@@ -128,8 +130,8 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { ...@@ -128,8 +130,8 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
private: private:
std::vector<grpc::ClientContext> context_; std::vector<grpc::ClientContext> context_;
std::vector<std::unique_ptr<grpc::ClientReaderWriter< std::vector<std::unique_ptr<
SimpleRequest, SimpleResponse>>> stream_; grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>> stream_;
}; };
std::unique_ptr<Client> CreateSynchronousUnaryClient( std::unique_ptr<Client> CreateSynchronousUnaryClient(
......
...@@ -105,7 +105,7 @@ std::unique_ptr<ScenarioResult> RunScenario( ...@@ -105,7 +105,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
if (!called_init) { if (!called_init) {
char args_buf[100]; char args_buf[100];
strcpy(args_buf, "some-benchmark"); strcpy(args_buf, "some-benchmark");
char *args[] = {args_buf}; char* args[] = {args_buf};
grpc_test_init(1, args); grpc_test_init(1, args);
called_init = true; called_init = true;
} }
...@@ -211,7 +211,8 @@ std::unique_ptr<ScenarioResult> RunScenario( ...@@ -211,7 +211,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
// Wait some time // Wait some time
gpr_log(GPR_INFO, "Running"); gpr_log(GPR_INFO, "Running");
gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(benchmark_seconds))); gpr_sleep_until(
gpr_time_add(start, gpr_time_from_seconds(benchmark_seconds)));
// Finish a run // Finish a run
std::unique_ptr<ScenarioResult> result(new ScenarioResult); std::unique_ptr<ScenarioResult> result(new ScenarioResult);
......
...@@ -100,8 +100,8 @@ int main(int argc, char** argv) { ...@@ -100,8 +100,8 @@ int main(int argc, char** argv) {
// client will deadlock on a timer. // client will deadlock on a timer.
GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER && GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER &&
rpc_type == grpc::testing::STREAMING && rpc_type == grpc::testing::STREAMING &&
FLAGS_server_threads < FLAGS_client_channels * FLAGS_server_threads <
FLAGS_outstanding_rpcs_per_channel)); FLAGS_client_channels * FLAGS_outstanding_rpcs_per_channel));
const auto result = RunScenario( const auto result = RunScenario(
client_config, FLAGS_num_clients, server_config, FLAGS_num_servers, client_config, FLAGS_num_clients, server_config, FLAGS_num_servers,
......
...@@ -64,17 +64,19 @@ namespace testing { ...@@ -64,17 +64,19 @@ namespace testing {
std::unique_ptr<Client> CreateClient(const ClientConfig& config) { std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
switch (config.client_type()) { switch (config.client_type()) {
case ClientType::SYNCHRONOUS_CLIENT: case ClientType::SYNCHRONOUS_CLIENT:
return (config.rpc_type() == RpcType::UNARY) ? return (config.rpc_type() == RpcType::UNARY)
CreateSynchronousUnaryClient(config) : ? CreateSynchronousUnaryClient(config)
CreateSynchronousStreamingClient(config); : CreateSynchronousStreamingClient(config);
case ClientType::ASYNC_CLIENT: case ClientType::ASYNC_CLIENT:
return (config.rpc_type() == RpcType::UNARY) ? return (config.rpc_type() == RpcType::UNARY)
CreateAsyncUnaryClient(config) : CreateAsyncStreamingClient(config); ? CreateAsyncUnaryClient(config)
: CreateAsyncStreamingClient(config);
} }
abort(); abort();
} }
std::unique_ptr<Server> CreateServer(const ServerConfig& config, int server_port) { std::unique_ptr<Server> CreateServer(const ServerConfig& config,
int server_port) {
switch (config.server_type()) { switch (config.server_type()) {
case ServerType::SYNCHRONOUS_SERVER: case ServerType::SYNCHRONOUS_SERVER:
return CreateSynchronousServer(config, server_port); return CreateSynchronousServer(config, server_port);
...@@ -86,7 +88,8 @@ std::unique_ptr<Server> CreateServer(const ServerConfig& config, int server_port ...@@ -86,7 +88,8 @@ std::unique_ptr<Server> CreateServer(const ServerConfig& config, int server_port
class WorkerImpl GRPC_FINAL : public Worker::Service { class WorkerImpl GRPC_FINAL : public Worker::Service {
public: public:
explicit WorkerImpl(int server_port) : server_port_(server_port), acquired_(false) {} explicit WorkerImpl(int server_port)
: server_port_(server_port), acquired_(false) {}
Status RunTest(ServerContext* ctx, Status RunTest(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream) ServerReaderWriter<ClientStatus, ClientArgs>* stream)
...@@ -97,7 +100,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { ...@@ -97,7 +100,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
} }
grpc_profiler_start("qps_client.prof"); grpc_profiler_start("qps_client.prof");
Status ret = RunTestBody(ctx,stream); Status ret = RunTestBody(ctx, stream);
grpc_profiler_stop(); grpc_profiler_stop();
return ret; return ret;
} }
...@@ -111,7 +114,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { ...@@ -111,7 +114,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
} }
grpc_profiler_start("qps_server.prof"); grpc_profiler_start("qps_server.prof");
Status ret = RunServerBody(ctx,stream); Status ret = RunServerBody(ctx, stream);
grpc_profiler_stop(); grpc_profiler_stop();
return ret; return ret;
} }
...@@ -226,8 +229,7 @@ QpsWorker::QpsWorker(int driver_port, int server_port) { ...@@ -226,8 +229,7 @@ QpsWorker::QpsWorker(int driver_port, int server_port) {
server_ = std::move(builder.BuildAndStart()); server_ = std::move(builder.BuildAndStart());
} }
QpsWorker::~QpsWorker() { QpsWorker::~QpsWorker() {}
}
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc
...@@ -48,18 +48,20 @@ void ReportQPS(const ScenarioResult& result) { ...@@ -48,18 +48,20 @@ void ReportQPS(const ScenarioResult& result) {
} }
// QPS: XXX (YYY/server core) // QPS: XXX (YYY/server core)
void ReportQPSPerCore(const ScenarioResult& result, const ServerConfig& server_config) { void ReportQPSPerCore(const ScenarioResult& result,
auto qps = const ServerConfig& server_config) {
result.latencies.Count() / auto qps = result.latencies.Count() /
average(result.client_resources, average(result.client_resources,
[](ResourceUsage u) { return u.wall_time; }); [](ResourceUsage u) { return u.wall_time; });
gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps, qps/server_config.threads()); gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps,
qps / server_config.threads());
} }
// Latency (50/90/95/99/99.9%-ile): AA/BB/CC/DD/EE us // Latency (50/90/95/99/99.9%-ile): AA/BB/CC/DD/EE us
void ReportLatency(const ScenarioResult& result) { void ReportLatency(const ScenarioResult& result) {
gpr_log(GPR_INFO, "Latencies (50/90/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f/%.1f us", gpr_log(GPR_INFO,
"Latencies (50/90/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f/%.1f us",
result.latencies.Percentile(50) / 1000, result.latencies.Percentile(50) / 1000,
result.latencies.Percentile(90) / 1000, result.latencies.Percentile(90) / 1000,
result.latencies.Percentile(95) / 1000, result.latencies.Percentile(95) / 1000,
......
...@@ -64,7 +64,7 @@ namespace testing { ...@@ -64,7 +64,7 @@ namespace testing {
class AsyncQpsServerTest : public Server { class AsyncQpsServerTest : public Server {
public: public:
AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) { AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) {
char* server_address = NULL; char *server_address = NULL;
gpr_join_host_port(&server_address, "::", port); gpr_join_host_port(&server_address, "::", port);
ServerBuilder builder; ServerBuilder builder;
...@@ -95,9 +95,9 @@ class AsyncQpsServerTest : public Server { ...@@ -95,9 +95,9 @@ class AsyncQpsServerTest : public Server {
threads_.push_back(std::thread([=]() { threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down // Wait until work is available or we are shutting down
bool ok; bool ok;
void* got_tag; void *got_tag;
while (srv_cq_->Next(&got_tag, &ok)) { while (srv_cq_->Next(&got_tag, &ok)) {
ServerRpcContext* ctx = detag(got_tag); ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke // The tag is a pointer to an RPC context to invoke
if (ctx->RunNextState(ok) == false) { if (ctx->RunNextState(ok) == false) {
// this RPC context is done, so refresh it // this RPC context is done, so refresh it
...@@ -133,23 +133,23 @@ class AsyncQpsServerTest : public Server { ...@@ -133,23 +133,23 @@ class AsyncQpsServerTest : public Server {
ServerRpcContext() {} ServerRpcContext() {}
virtual ~ServerRpcContext(){}; virtual ~ServerRpcContext(){};
virtual bool RunNextState(bool) = 0; // next state, return false if done virtual bool RunNextState(bool) = 0; // next state, return false if done
virtual void Reset() = 0; // start this back at a clean state virtual void Reset() = 0; // start this back at a clean state
}; };
static void* tag(ServerRpcContext* func) { static void *tag(ServerRpcContext *func) {
return reinterpret_cast<void*>(func); return reinterpret_cast<void *>(func);
} }
static ServerRpcContext* detag(void* tag) { static ServerRpcContext *detag(void *tag) {
return reinterpret_cast<ServerRpcContext*>(tag); return reinterpret_cast<ServerRpcContext *>(tag);
} }
template <class RequestType, class ResponseType> template <class RequestType, class ResponseType>
class ServerRpcContextUnaryImpl GRPC_FINAL : public ServerRpcContext { class ServerRpcContextUnaryImpl GRPC_FINAL : public ServerRpcContext {
public: public:
ServerRpcContextUnaryImpl( ServerRpcContextUnaryImpl(
std::function<void(ServerContext*, RequestType*, std::function<void(ServerContext *, RequestType *,
grpc::ServerAsyncResponseWriter<ResponseType>*, grpc::ServerAsyncResponseWriter<ResponseType> *,
void*)> request_method, void *)> request_method,
std::function<grpc::Status(const RequestType*, ResponseType*)> std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method) invoke_method)
: next_state_(&ServerRpcContextUnaryImpl::invoker), : next_state_(&ServerRpcContextUnaryImpl::invoker),
request_method_(request_method), request_method_(request_method),
...@@ -159,7 +159,9 @@ class AsyncQpsServerTest : public Server { ...@@ -159,7 +159,9 @@ class AsyncQpsServerTest : public Server {
AsyncQpsServerTest::tag(this)); AsyncQpsServerTest::tag(this));
} }
~ServerRpcContextUnaryImpl() GRPC_OVERRIDE {} ~ServerRpcContextUnaryImpl() GRPC_OVERRIDE {}
bool RunNextState(bool ok) GRPC_OVERRIDE {return (this->*next_state_)(ok);} bool RunNextState(bool ok) GRPC_OVERRIDE {
return (this->*next_state_)(ok);
}
void Reset() GRPC_OVERRIDE { void Reset() GRPC_OVERRIDE {
srv_ctx_ = ServerContext(); srv_ctx_ = ServerContext();
req_ = RequestType(); req_ = RequestType();
...@@ -192,10 +194,10 @@ class AsyncQpsServerTest : public Server { ...@@ -192,10 +194,10 @@ class AsyncQpsServerTest : public Server {
ServerContext srv_ctx_; ServerContext srv_ctx_;
RequestType req_; RequestType req_;
bool (ServerRpcContextUnaryImpl::*next_state_)(bool); bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
std::function<void(ServerContext*, RequestType*, std::function<void(ServerContext *, RequestType *,
grpc::ServerAsyncResponseWriter<ResponseType>*, void*)> grpc::ServerAsyncResponseWriter<ResponseType> *, void *)>
request_method_; request_method_;
std::function<grpc::Status(const RequestType*, ResponseType*)> std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method_; invoke_method_;
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_; grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
}; };
...@@ -204,9 +206,9 @@ class AsyncQpsServerTest : public Server { ...@@ -204,9 +206,9 @@ class AsyncQpsServerTest : public Server {
class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext { class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext {
public: public:
ServerRpcContextStreamingImpl( ServerRpcContextStreamingImpl(
std::function<void(ServerContext *, std::function<void(ServerContext *, grpc::ServerAsyncReaderWriter<
grpc::ServerAsyncReaderWriter<ResponseType, ResponseType, RequestType> *,
RequestType> *, void *)> request_method, void *)> request_method,
std::function<grpc::Status(const RequestType *, ResponseType *)> std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method) invoke_method)
: next_state_(&ServerRpcContextStreamingImpl::request_done), : next_state_(&ServerRpcContextStreamingImpl::request_done),
...@@ -215,14 +217,15 @@ class AsyncQpsServerTest : public Server { ...@@ -215,14 +217,15 @@ class AsyncQpsServerTest : public Server {
stream_(&srv_ctx_) { stream_(&srv_ctx_) {
request_method_(&srv_ctx_, &stream_, AsyncQpsServerTest::tag(this)); request_method_(&srv_ctx_, &stream_, AsyncQpsServerTest::tag(this));
} }
~ServerRpcContextStreamingImpl() GRPC_OVERRIDE { ~ServerRpcContextStreamingImpl() GRPC_OVERRIDE {}
bool RunNextState(bool ok) GRPC_OVERRIDE {
return (this->*next_state_)(ok);
} }
bool RunNextState(bool ok) GRPC_OVERRIDE {return (this->*next_state_)(ok);}
void Reset() GRPC_OVERRIDE { void Reset() GRPC_OVERRIDE {
srv_ctx_ = ServerContext(); srv_ctx_ = ServerContext();
req_ = RequestType(); req_ = RequestType();
stream_ = grpc::ServerAsyncReaderWriter<ResponseType, stream_ =
RequestType>(&srv_ctx_); grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(&srv_ctx_);
// Then request the method // Then request the method
next_state_ = &ServerRpcContextStreamingImpl::request_done; next_state_ = &ServerRpcContextStreamingImpl::request_done;
...@@ -241,47 +244,47 @@ class AsyncQpsServerTest : public Server { ...@@ -241,47 +244,47 @@ class AsyncQpsServerTest : public Server {
bool read_done(bool ok) { bool read_done(bool ok) {
if (ok) { if (ok) {
// invoke the method // invoke the method
ResponseType response; ResponseType response;
// Call the RPC processing function // Call the RPC processing function
grpc::Status status = invoke_method_(&req_, &response); grpc::Status status = invoke_method_(&req_, &response);
// initiate the write // initiate the write
stream_.Write(response, AsyncQpsServerTest::tag(this)); stream_.Write(response, AsyncQpsServerTest::tag(this));
next_state_ = &ServerRpcContextStreamingImpl::write_done; next_state_ = &ServerRpcContextStreamingImpl::write_done;
} else { // client has sent writes done } else { // client has sent writes done
// finish the stream // finish the stream
stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
next_state_ = &ServerRpcContextStreamingImpl::finish_done; next_state_ = &ServerRpcContextStreamingImpl::finish_done;
} }
return true; return true;
} }
bool write_done(bool ok) { bool write_done(bool ok) {
// now go back and get another streaming read! // now go back and get another streaming read!
if (ok) { if (ok) {
stream_.Read(&req_, AsyncQpsServerTest::tag(this)); stream_.Read(&req_, AsyncQpsServerTest::tag(this));
next_state_ = &ServerRpcContextStreamingImpl::read_done; next_state_ = &ServerRpcContextStreamingImpl::read_done;
} } else {
else { stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); next_state_ = &ServerRpcContextStreamingImpl::finish_done;
next_state_ = &ServerRpcContextStreamingImpl::finish_done;
} }
return true; return true;
} }
bool finish_done(bool ok) {return false; /* reset the context */ } bool finish_done(bool ok) { return false; /* reset the context */ }
ServerContext srv_ctx_; ServerContext srv_ctx_;
RequestType req_; RequestType req_;
bool (ServerRpcContextStreamingImpl::*next_state_)(bool); bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
std::function<void(ServerContext *, std::function<void(
grpc::ServerAsyncReaderWriter<ResponseType, ServerContext *,
RequestType> *, void *)> request_method_; grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)>
request_method_;
std::function<grpc::Status(const RequestType *, ResponseType *)> std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method_; invoke_method_;
grpc::ServerAsyncReaderWriter<ResponseType,RequestType> stream_; grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
}; };
static Status ProcessRPC(const SimpleRequest* request, static Status ProcessRPC(const SimpleRequest *request,
SimpleResponse* response) { SimpleResponse *response) {
if (request->response_size() > 0) { if (request->response_size() > 0) {
if (!SetPayload(request->response_type(), request->response_size(), if (!SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) { response->mutable_payload())) {
...@@ -294,19 +297,20 @@ class AsyncQpsServerTest : public Server { ...@@ -294,19 +297,20 @@ class AsyncQpsServerTest : public Server {
std::unique_ptr<grpc::Server> server_; std::unique_ptr<grpc::Server> server_;
std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_; std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_;
TestService::AsyncService async_service_; TestService::AsyncService async_service_;
std::function<void(ServerContext*, SimpleRequest*, std::function<void(ServerContext *, SimpleRequest *,
grpc::ServerAsyncResponseWriter<SimpleResponse>*, void*)> grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
request_unary_; request_unary_;
std::function<void(ServerContext*, grpc::ServerAsyncReaderWriter< std::function<void(
SimpleResponse,SimpleRequest>*, void*)> ServerContext *,
grpc::ServerAsyncReaderWriter<SimpleResponse, SimpleRequest> *, void *)>
request_streaming_; request_streaming_;
std::forward_list<ServerRpcContext*> contexts_; std::forward_list<ServerRpcContext *> contexts_;
std::mutex shutdown_mutex_; std::mutex shutdown_mutex_;
bool shutdown_; bool shutdown_;
}; };
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
int port) { int port) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port)); return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
} }
......
...@@ -70,18 +70,18 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service { ...@@ -70,18 +70,18 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service {
} }
return Status::OK; return Status::OK;
} }
Status StreamingCall(ServerContext *context, Status StreamingCall(
ServerReaderWriter<SimpleResponse, SimpleRequest>* ServerContext* context,
stream) GRPC_OVERRIDE { ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) GRPC_OVERRIDE {
SimpleRequest request; SimpleRequest request;
while (stream->Read(&request)) { while (stream->Read(&request)) {
SimpleResponse response; SimpleResponse response;
if (request.response_size() > 0) { if (request.response_size() > 0) {
if (!Server::SetPayload(request.response_type(), if (!Server::SetPayload(request.response_type(),
request.response_size(), request.response_size(),
response.mutable_payload())) { response.mutable_payload())) {
return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
} }
} }
stream->Write(response); stream->Write(response);
} }
......
...@@ -47,7 +47,7 @@ DEFINE_int32(server_port, 0, "Spawned server port."); ...@@ -47,7 +47,7 @@ DEFINE_int32(server_port, 0, "Spawned server port.");
static bool got_sigint = false; static bool got_sigint = false;
static void sigint_handler(int x) {got_sigint = true;} static void sigint_handler(int x) { got_sigint = true; }
namespace grpc { namespace grpc {
namespace testing { namespace testing {
...@@ -69,6 +69,6 @@ int main(int argc, char** argv) { ...@@ -69,6 +69,6 @@ int main(int argc, char** argv) {
signal(SIGINT, sigint_handler); signal(SIGINT, sigint_handler);
grpc::testing::RunServer(); grpc::testing::RunServer();
return 0; return 0;
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment