Skip to content
Snippets Groups Projects
Commit a182bf12 authored by Craig Tiller's avatar Craig Tiller
Browse files

clang-format

parent ce78a19d
No related branches found
No related tags found
No related merge requests found
...@@ -84,8 +84,7 @@ class Client { ...@@ -84,8 +84,7 @@ class Client {
class ClientChannelInfo { class ClientChannelInfo {
public: public:
explicit ClientChannelInfo(const grpc::string& target, ClientChannelInfo(const grpc::string& target, const ClientConfig& config)
const ClientConfig& config)
: channel_(CreateTestChannel(target, config.enable_ssl())), : channel_(CreateTestChannel(target, config.enable_ssl())),
stub_(TestService::NewStub(channel_)) {} stub_(TestService::NewStub(channel_)) {}
ChannelInterface* get_channel() { return channel_.get(); } ChannelInterface* get_channel() { return channel_.get(); }
...@@ -98,14 +97,12 @@ class Client { ...@@ -98,14 +97,12 @@ class Client {
std::vector<ClientChannelInfo> channels_; std::vector<ClientChannelInfo> channels_;
void StartThreads(size_t num_threads) { void StartThreads(size_t num_threads) {
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i)); threads_.emplace_back(new Thread(this, i));
} }
} }
void EndThreads() { void EndThreads() { threads_.clear(); }
threads_.clear();
}
virtual void ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; virtual void ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
......
...@@ -72,22 +72,22 @@ template <class RequestType, class ResponseType> ...@@ -72,22 +72,22 @@ template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext { class ClientRpcContextUnaryImpl : public ClientRpcContext {
public: public:
ClientRpcContextUnaryImpl( ClientRpcContextUnaryImpl(
TestService::Stub *stub, TestService::Stub *stub, const RequestType &req,
const RequestType &req,
std::function< std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub *, grpc::ClientContext *, const RequestType &, TestService::Stub *, grpc::ClientContext *, const RequestType &,
void *)> start_req, void *)> start_req,
std::function<void(grpc::Status, ResponseType *)> on_done) std::function<void(grpc::Status, ResponseType *)> on_done)
: context_(), : context_(),
stub_(stub), stub_(stub),
req_(req), req_(req),
response_(), response_(),
next_state_(&ClientRpcContextUnaryImpl::ReqSent), next_state_(&ClientRpcContextUnaryImpl::ReqSent),
callback_(on_done), start_req_(start_req), callback_(on_done),
start_req_(start_req),
start_(Timer::Now()), start_(Timer::Now()),
response_reader_( response_reader_(
start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {}
~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
bool RunNextState() GRPC_OVERRIDE { return (this->*next_state_)(); } bool RunNextState() GRPC_OVERRIDE { return (this->*next_state_)(); }
void report_stats(Histogram *hist) GRPC_OVERRIDE { void report_stats(Histogram *hist) GRPC_OVERRIDE {
...@@ -118,10 +118,9 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { ...@@ -118,10 +118,9 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
ResponseType response_; ResponseType response_;
bool (ClientRpcContextUnaryImpl::*next_state_)(); bool (ClientRpcContextUnaryImpl::*next_state_)();
std::function<void(grpc::Status, ResponseType *)> callback_; std::function<void(grpc::Status, ResponseType *)> callback_;
std::function< std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( TestService::Stub *, grpc::ClientContext *, const RequestType &, void *)>
TestService::Stub *, grpc::ClientContext *, const RequestType &, start_req_;
void *)> start_req_;
grpc::Status status_; grpc::Status status_;
double start_; double start_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
...@@ -130,7 +129,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { ...@@ -130,7 +129,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
class AsyncClient GRPC_FINAL : public Client { class AsyncClient GRPC_FINAL : public Client {
public: public:
explicit AsyncClient(const ClientConfig& config) : Client(config) { explicit AsyncClient(const ClientConfig &config) : Client(config) {
for (int i = 0; i < config.async_client_threads(); i++) { for (int i = 0; i < config.async_client_threads(); i++) {
cli_cqs_.emplace_back(new CompletionQueue); cli_cqs_.emplace_back(new CompletionQueue);
} }
...@@ -145,16 +144,18 @@ class AsyncClient GRPC_FINAL : public Client { ...@@ -145,16 +144,18 @@ class AsyncClient GRPC_FINAL : public Client {
int t = 0; int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (auto& channel : channels_) { for (auto &channel : channels_) {
auto *cq = cli_cqs_[t].get(); auto *cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size(); t = (t + 1) % cli_cqs_.size();
auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, const SimpleRequest& request, void *tag) { auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx,
const SimpleRequest &request, void *tag) {
return stub->AsyncUnaryCall(ctx, request, cq, tag); return stub->AsyncUnaryCall(ctx, request, cq, tag);
}; };
TestService::Stub* stub = channel.get_stub(); TestService::Stub *stub = channel.get_stub();
const SimpleRequest& request = request_; const SimpleRequest &request = request_;
new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(stub, request, start_req, check_done); new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
stub, request, start_req, check_done);
} }
} }
...@@ -164,7 +165,7 @@ class AsyncClient GRPC_FINAL : public Client { ...@@ -164,7 +165,7 @@ class AsyncClient GRPC_FINAL : public Client {
~AsyncClient() GRPC_OVERRIDE { ~AsyncClient() GRPC_OVERRIDE {
EndThreads(); EndThreads();
for (auto& cq : cli_cqs_) { for (auto &cq : cli_cqs_) {
cq->Shutdown(); cq->Shutdown();
void *got_tag; void *got_tag;
bool ok; bool ok;
...@@ -192,9 +193,9 @@ class AsyncClient GRPC_FINAL : public Client { ...@@ -192,9 +193,9 @@ class AsyncClient GRPC_FINAL : public Client {
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
}; };
std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args) { std::unique_ptr<Client> CreateAsyncClient(const ClientConfig &args) {
return std::unique_ptr<Client>(new AsyncClient(args)); return std::unique_ptr<Client>(new AsyncClient(args));
} }
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc
...@@ -64,20 +64,20 @@ namespace testing { ...@@ -64,20 +64,20 @@ namespace testing {
class SynchronousClient GRPC_FINAL : public Client { class SynchronousClient GRPC_FINAL : public Client {
public: public:
SynchronousClient(const ClientConfig& config) : Client(config) { SynchronousClient(const ClientConfig& config) : Client(config) {
size_t num_threads = config.outstanding_rpcs_per_channel() * config.client_channels(); size_t num_threads =
config.outstanding_rpcs_per_channel() * config.client_channels();
responses_.resize(num_threads); responses_.resize(num_threads);
StartThreads(num_threads); StartThreads(num_threads);
} }
~SynchronousClient() { ~SynchronousClient() { EndThreads(); }
EndThreads();
}
void ThreadFunc(Histogram* histogram, size_t thread_idx) { void ThreadFunc(Histogram* histogram, size_t thread_idx) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub(); auto* stub = channels_[thread_idx % channels_.size()].get_stub();
double start = Timer::Now(); double start = Timer::Now();
grpc::ClientContext context; grpc::ClientContext context;
grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); grpc::Status s =
stub->UnaryCall(&context, request_, &responses_[thread_idx]);
histogram->Add((Timer::Now() - start) * 1e9); histogram->Add((Timer::Now() - start) * 1e9);
} }
......
...@@ -71,8 +71,10 @@ static vector<string> get_hosts(const string& name) { ...@@ -71,8 +71,10 @@ static vector<string> get_hosts(const string& name) {
} }
} }
ScenarioResult RunScenario(const ClientConfig& initial_client_config, size_t num_clients, ScenarioResult RunScenario(const ClientConfig& initial_client_config,
const ServerConfig& server_config, size_t num_servers) { size_t num_clients,
const ServerConfig& server_config,
size_t num_servers) {
// ClientContext allocator (all are destroyed at scope exit) // ClientContext allocator (all are destroyed at scope exit)
list<ClientContext> contexts; list<ClientContext> contexts;
auto alloc_context = [&contexts]() { auto alloc_context = [&contexts]() {
...@@ -183,13 +185,15 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config, size_t num ...@@ -183,13 +185,15 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config, size_t num
for (auto& server : servers) { for (auto& server : servers) {
GPR_ASSERT(server.stream->Read(&server_status)); GPR_ASSERT(server.stream->Read(&server_status));
const auto& stats = server_status.stats(); const auto& stats = server_status.stats();
result.server_resources.push_back(ResourceUsage{stats.time_elapsed(), stats.time_user(), stats.time_system()}); result.server_resources.push_back(ResourceUsage{
stats.time_elapsed(), stats.time_user(), stats.time_system()});
} }
for (auto& client : clients) { for (auto& client : clients) {
GPR_ASSERT(client.stream->Read(&client_status)); GPR_ASSERT(client.stream->Read(&client_status));
const auto& stats = client_status.stats(); const auto& stats = client_status.stats();
result.latencies.MergeProto(stats.latencies()); result.latencies.MergeProto(stats.latencies());
result.client_resources.push_back(ResourceUsage{stats.time_elapsed(), stats.time_user(), stats.time_system()}); result.client_resources.push_back(ResourceUsage{
stats.time_elapsed(), stats.time_user(), stats.time_system()});
} }
for (auto& client : clients) { for (auto& client : clients) {
......
...@@ -52,10 +52,10 @@ struct ScenarioResult { ...@@ -52,10 +52,10 @@ struct ScenarioResult {
}; };
ScenarioResult RunScenario(const grpc::testing::ClientConfig& client_config, ScenarioResult RunScenario(const grpc::testing::ClientConfig& client_config,
size_t num_clients, size_t num_clients,
const grpc::testing::ServerConfig& server_config, const grpc::testing::ServerConfig& server_config,
size_t num_servers); size_t num_servers);
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc
#endif #endif
...@@ -43,10 +43,10 @@ namespace testing { ...@@ -43,10 +43,10 @@ namespace testing {
class Histogram { class Histogram {
public: public:
Histogram() : impl_(gpr_histogram_create(0.01, 60e9)) {} Histogram() : impl_(gpr_histogram_create(0.01, 60e9)) {}
~Histogram() { if (impl_) gpr_histogram_destroy(impl_); } ~Histogram() {
Histogram(Histogram&& other) : impl_(other.impl_) { if (impl_) gpr_histogram_destroy(impl_);
other.impl_ = nullptr;
} }
Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; }
void Merge(Histogram* h) { gpr_histogram_merge(impl_, h->impl_); } void Merge(Histogram* h) { gpr_histogram_merge(impl_, h->impl_); }
void Add(double value) { gpr_histogram_add(impl_, value); } void Add(double value) { gpr_histogram_add(impl_, value); }
......
...@@ -93,18 +93,39 @@ int main(int argc, char **argv) { ...@@ -93,18 +93,39 @@ int main(int argc, char **argv) {
server_config.set_enable_ssl(FLAGS_enable_ssl); server_config.set_enable_ssl(FLAGS_enable_ssl);
auto result = RunScenario(client_config, FLAGS_num_clients, server_config, auto result = RunScenario(client_config, FLAGS_num_clients, server_config,
FLAGS_num_servers); FLAGS_num_servers);
gpr_log(GPR_INFO, "QPS: %.1f", result.latencies.Count() / average(result.client_resources, [](ResourceUsage u) { return u.wall_time; })); gpr_log(GPR_INFO, "QPS: %.1f",
result.latencies.Count() /
average(result.client_resources,
[](ResourceUsage u) { return u.wall_time; }));
gpr_log(GPR_INFO, "Latencies (50/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f us", gpr_log(GPR_INFO, "Latencies (50/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f us",
result.latencies.Percentile(50) / 1000, result.latencies.Percentile(95) / 1000, result.latencies.Percentile(50) / 1000,
result.latencies.Percentile(99) / 1000, result.latencies.Percentile(99.9) / 1000); result.latencies.Percentile(95) / 1000,
result.latencies.Percentile(99) / 1000,
gpr_log(GPR_INFO, "Server system time: %.2f%%", 100.0 * sum(result.server_resources, [](ResourceUsage u) { return u.system_time; }) / sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; })); result.latencies.Percentile(99.9) / 1000);
gpr_log(GPR_INFO, "Server user time: %.2f%%", 100.0 * sum(result.server_resources, [](ResourceUsage u) { return u.user_time; }) / sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; }));
gpr_log(GPR_INFO, "Client system time: %.2f%%", 100.0 * sum(result.client_resources, [](ResourceUsage u) { return u.system_time; }) / sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; })); gpr_log(GPR_INFO, "Server system time: %.2f%%",
gpr_log(GPR_INFO, "Client user time: %.2f%%", 100.0 * sum(result.client_resources, [](ResourceUsage u) { return u.user_time; }) / sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; })); 100.0 * sum(result.server_resources,
[](ResourceUsage u) { return u.system_time; }) /
sum(result.server_resources,
[](ResourceUsage u) { return u.wall_time; }));
gpr_log(GPR_INFO, "Server user time: %.2f%%",
100.0 * sum(result.server_resources,
[](ResourceUsage u) { return u.user_time; }) /
sum(result.server_resources,
[](ResourceUsage u) { return u.wall_time; }));
gpr_log(GPR_INFO, "Client system time: %.2f%%",
100.0 * sum(result.client_resources,
[](ResourceUsage u) { return u.system_time; }) /
sum(result.client_resources,
[](ResourceUsage u) { return u.wall_time; }));
gpr_log(GPR_INFO, "Client user time: %.2f%%",
100.0 * sum(result.client_resources,
[](ResourceUsage u) { return u.user_time; }) /
sum(result.client_resources,
[](ResourceUsage u) { return u.wall_time; }));
grpc_shutdown(); grpc_shutdown();
return 0; return 0;
......
...@@ -42,7 +42,7 @@ namespace testing { ...@@ -42,7 +42,7 @@ namespace testing {
class Server { class Server {
public: public:
Server():timer_(new Timer) {} Server() : timer_(new Timer) {}
virtual ~Server() {} virtual ~Server() {}
ServerStats Mark() { ServerStats Mark() {
...@@ -58,17 +58,17 @@ class Server { ...@@ -58,17 +58,17 @@ class Server {
return stats; return stats;
} }
static bool SetPayload(PayloadType type, int size, Payload* payload) { static bool SetPayload(PayloadType type, int size, Payload* payload) {
PayloadType response_type = type; PayloadType response_type = type;
// TODO(yangg): Support UNCOMPRESSABLE payload. // TODO(yangg): Support UNCOMPRESSABLE payload.
if (type != PayloadType::COMPRESSABLE) { if (type != PayloadType::COMPRESSABLE) {
return false; return false;
} }
payload->set_type(response_type); payload->set_type(response_type);
std::unique_ptr<char[]> body(new char[size]()); std::unique_ptr<char[]> body(new char[size]());
payload->set_body(body.get(), size); payload->set_body(body.get(), size);
return true; return true;
} }
private: private:
std::unique_ptr<Timer> timer_; std::unique_ptr<Timer> timer_;
......
...@@ -57,11 +57,12 @@ ...@@ -57,11 +57,12 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
namespace grpc { namespace grpc {
namespace testing { namespace testing {
class AsyncQpsServerTest : public Server { class AsyncQpsServerTest : public Server {
public: public:
AsyncQpsServerTest(const ServerConfig& config, int port) : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { AsyncQpsServerTest(const ServerConfig &config, int port)
: srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
char *server_address = NULL; char *server_address = NULL;
gpr_join_host_port(&server_address, "::", port); gpr_join_host_port(&server_address, "::", port);
...@@ -103,7 +104,7 @@ class AsyncQpsServerTest : public Server { ...@@ -103,7 +104,7 @@ class AsyncQpsServerTest : public Server {
~AsyncQpsServerTest() { ~AsyncQpsServerTest() {
server_->Shutdown(); server_->Shutdown();
srv_cq_.Shutdown(); srv_cq_.Shutdown();
for (auto& thr: threads_) { for (auto &thr : threads_) {
thr.join(); thr.join();
} }
while (!contexts_.empty()) { while (!contexts_.empty()) {
...@@ -117,8 +118,8 @@ class AsyncQpsServerTest : public Server { ...@@ -117,8 +118,8 @@ class AsyncQpsServerTest : public Server {
public: public:
ServerRpcContext() {} ServerRpcContext() {}
virtual ~ServerRpcContext(){}; virtual ~ServerRpcContext(){};
virtual bool RunNextState() = 0;// do next state, return false if all done virtual bool RunNextState() = 0; // do next state, return false if all done
virtual void Reset() = 0; // start this back at a clean state virtual void Reset() = 0; // start this back at a clean state
}; };
static void *tag(ServerRpcContext *func) { static void *tag(ServerRpcContext *func) {
return reinterpret_cast<void *>(func); return reinterpret_cast<void *>(func);
...@@ -201,9 +202,10 @@ class AsyncQpsServerTest : public Server { ...@@ -201,9 +202,10 @@ class AsyncQpsServerTest : public Server {
std::forward_list<ServerRpcContext *> contexts_; std::forward_list<ServerRpcContext *> contexts_;
}; };
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port) { std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port)); int port) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
} }
}// namespace testing } // namespace testing
}// namespace grpc } // namespace grpc
...@@ -62,8 +62,9 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service { ...@@ -62,8 +62,9 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service {
Status UnaryCall(ServerContext* context, const SimpleRequest* request, Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) override { SimpleResponse* response) override {
if (request->has_response_size() && request->response_size() > 0) { if (request->has_response_size() && request->response_size() > 0) {
if (!Server::SetPayload(request->response_type(), request->response_size(), if (!Server::SetPayload(request->response_type(),
response->mutable_payload())) { request->response_size(),
response->mutable_payload())) {
return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
} }
} }
...@@ -74,8 +75,7 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service { ...@@ -74,8 +75,7 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service {
class SynchronousServer GRPC_FINAL : public grpc::testing::Server { class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
public: public:
SynchronousServer(const ServerConfig& config, int port) SynchronousServer(const ServerConfig& config, int port)
: thread_pool_(config.threads()), : thread_pool_(config.threads()), impl_(MakeImpl(port)) {}
impl_(MakeImpl(port)) {}
private: private:
std::unique_ptr<grpc::Server> MakeImpl(int port) { std::unique_ptr<grpc::Server> MakeImpl(int port) {
......
...@@ -44,7 +44,7 @@ template <class T, class F> ...@@ -44,7 +44,7 @@ template <class T, class F>
double sum(const T& container, F functor) { double sum(const T& container, F functor) {
double r = 0; double r = 0;
for (auto v : container) { for (auto v : container) {
r += functor(v); r += functor(v);
} }
return r; return r;
} }
...@@ -54,7 +54,7 @@ double average(const T& container, F functor) { ...@@ -54,7 +54,7 @@ double average(const T& container, F functor) {
return sum(container, functor) / container.size(); return sum(container, functor) / container.size();
} }
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc
#endif #endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment