Skip to content
Snippets Groups Projects
Commit eed63fa2 authored by Vijay Pai's avatar Vijay Pai
Browse files

Remaining changes needed to get QPS test working on old compilers.

This change contains a lot of ugly changes, such as changing
std::vector to allocation by new, etc.
parent 8cfff315
No related branches found
No related tags found
No related merge requests found
...@@ -69,10 +69,10 @@ typedef std::chrono::time_point<grpc_time_source> grpc_time; ...@@ -69,10 +69,10 @@ typedef std::chrono::time_point<grpc_time_source> grpc_time;
class Client { class Client {
public: public:
explicit Client(const ClientConfig& config) explicit Client(const ClientConfig& config)
: timer_(new Timer), interarrival_timer_() { : channels_(config.client_channels()), timer_(new Timer), interarrival_timer_() {
for (int i = 0; i < config.client_channels(); i++) { for (int i = 0; i < config.client_channels(); i++) {
channels_.emplace_back( channels_[i].init(config.server_targets(i % config.server_targets_size()),
config.server_targets(i % config.server_targets_size()), config); config);
} }
request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request_.set_response_size(config.payload_size()); request_.set_response_size(config.payload_size());
...@@ -81,7 +81,7 @@ class Client { ...@@ -81,7 +81,7 @@ class Client {
ClientStats Mark() { ClientStats Mark() {
Histogram latencies; Histogram latencies;
std::vector<Histogram> to_merge(threads_.size()); Histogram to_merge[threads_.size()]; // avoid std::vector for old compilers
for (size_t i = 0; i < threads_.size(); i++) { for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->BeginSwap(&to_merge[i]); threads_[i]->BeginSwap(&to_merge[i]);
} }
...@@ -108,12 +108,16 @@ class Client { ...@@ -108,12 +108,16 @@ class Client {
class ClientChannelInfo { class ClientChannelInfo {
public: public:
ClientChannelInfo(const grpc::string& target, const ClientConfig& config) ClientChannelInfo() {}
: channel_(CreateTestChannel(target, config.enable_ssl())), ClientChannelInfo(const ClientChannelInfo& i): channel_(), stub_() {
stub_(TestService::NewStub(channel_)) {} GPR_ASSERT(!i.channel_ && !i.stub_);
}
void init(const grpc::string& target, const ClientConfig& config) {
channel_ = CreateTestChannel(target, config.enable_ssl());
stub_ = TestService::NewStub(channel_);
}
ChannelInterface* get_channel() { return channel_.get(); } ChannelInterface* get_channel() { return channel_.get(); }
TestService::Stub* get_stub() { return stub_.get(); } TestService::Stub* get_stub() { return stub_.get(); }
private: private:
std::shared_ptr<ChannelInterface> channel_; std::shared_ptr<ChannelInterface> channel_;
std::unique_ptr<TestService::Stub> stub_; std::unique_ptr<TestService::Stub> stub_;
......
...@@ -156,7 +156,7 @@ class AsyncClient : public Client { ...@@ -156,7 +156,7 @@ class AsyncClient : public Client {
std::function<ClientRpcContext*(int, TestService::Stub*, std::function<ClientRpcContext*(int, TestService::Stub*,
const SimpleRequest&)> setup_ctx) const SimpleRequest&)> setup_ctx)
: Client(config), : Client(config),
channel_lock_(config.client_channels()), channel_lock_(new std::mutex[config.client_channels()]),
contexts_(config.client_channels()), contexts_(config.client_channels()),
max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
channel_count_(config.client_channels()), channel_count_(config.client_channels()),
...@@ -208,6 +208,7 @@ class AsyncClient : public Client { ...@@ -208,6 +208,7 @@ class AsyncClient : public Client {
delete ctx; delete ctx;
} }
} }
delete[] channel_lock_;
} }
bool ThreadFunc(Histogram* histogram, bool ThreadFunc(Histogram* histogram,
...@@ -336,7 +337,7 @@ class AsyncClient : public Client { ...@@ -336,7 +337,7 @@ class AsyncClient : public Client {
std::vector<boolean> issue_allowed_; // may this thread attempt to issue std::vector<boolean> issue_allowed_; // may this thread attempt to issue
std::vector<grpc_time> next_issue_; // when should it issue? std::vector<grpc_time> next_issue_; // when should it issue?
std::vector<std::mutex> channel_lock_; std::mutex *channel_lock_; // a vector, but avoid std::vector for old compilers
std::vector<context_list> contexts_; // per-channel list of idle contexts std::vector<context_list> contexts_; // per-channel list of idle contexts
int max_outstanding_per_channel_; int max_outstanding_per_channel_;
int channel_count_; int channel_count_;
......
...@@ -113,9 +113,10 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { ...@@ -113,9 +113,10 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
public: public:
SynchronousStreamingClient(const ClientConfig& config) SynchronousStreamingClient(const ClientConfig& config)
: SynchronousClient(config), : SynchronousClient(config) {
context_(num_threads_), context_ = new grpc::ClientContext[num_threads_];
stream_(num_threads_) { stream_ = new std::unique_ptr<
grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>[num_threads_];
for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub(); auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
...@@ -124,12 +125,14 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { ...@@ -124,12 +125,14 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
} }
~SynchronousStreamingClient() { ~SynchronousStreamingClient() {
EndThreads(); EndThreads();
for (auto stream = stream_.begin(); stream != stream_.end(); stream++) { for (auto stream = &stream_[0]; stream != &stream_[num_threads_]; stream++) {
if (*stream) { if (*stream) {
(*stream)->WritesDone(); (*stream)->WritesDone();
EXPECT_TRUE((*stream)->Finish().ok()); EXPECT_TRUE((*stream)->Finish().ok());
} }
} }
delete[] stream_;
delete[] context_;
} }
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
...@@ -144,9 +147,11 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { ...@@ -144,9 +147,11 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
} }
private: private:
std::vector<grpc::ClientContext> context_; // These are both conceptually std::vector but cannot be for old compilers
std::vector<std::unique_ptr< // that expect contained classes to support copy constructors
grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>> stream_; grpc::ClientContext *context_;
std::unique_ptr<
grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>* stream_;
}; };
std::unique_ptr<Client> CreateSynchronousUnaryClient( std::unique_ptr<Client> CreateSynchronousUnaryClient(
......
...@@ -149,19 +149,18 @@ std::unique_ptr<ScenarioResult> RunScenario( ...@@ -149,19 +149,18 @@ std::unique_ptr<ScenarioResult> RunScenario(
// Start servers // Start servers
using runsc::ServerData; using runsc::ServerData;
vector<ServerData> servers; ServerData servers[num_servers];
for (size_t i = 0; i < num_servers; i++) { for (size_t i = 0; i < num_servers; i++) {
ServerData sd; servers[i].stub = std::move(Worker::NewStub(
sd.stub = std::move(Worker::NewStub(
CreateChannel(workers[i], InsecureCredentials(), ChannelArguments()))); CreateChannel(workers[i], InsecureCredentials(), ChannelArguments())));
ServerArgs args; ServerArgs args;
result_server_config = server_config; result_server_config = server_config;
result_server_config.set_host(workers[i]); result_server_config.set_host(workers[i]);
*args.mutable_setup() = server_config; *args.mutable_setup() = server_config;
sd.stream = std::move(sd.stub->RunServer(runsc::AllocContext(&contexts))); servers[i].stream = std::move(servers[i].stub->RunServer(runsc::AllocContext(&contexts)));
GPR_ASSERT(sd.stream->Write(args)); GPR_ASSERT(servers[i].stream->Write(args));
ServerStatus init_status; ServerStatus init_status;
GPR_ASSERT(sd.stream->Read(&init_status)); GPR_ASSERT(servers[i].stream->Read(&init_status));
char* host; char* host;
char* driver_port; char* driver_port;
char* cli_target; char* cli_target;
...@@ -171,27 +170,22 @@ std::unique_ptr<ScenarioResult> RunScenario( ...@@ -171,27 +170,22 @@ std::unique_ptr<ScenarioResult> RunScenario(
gpr_free(host); gpr_free(host);
gpr_free(driver_port); gpr_free(driver_port);
gpr_free(cli_target); gpr_free(cli_target);
servers.push_back(std::move(sd));
} }
// Start clients // Start clients
using runsc::ClientData; using runsc::ClientData;
vector<ClientData> clients; ClientData clients[num_clients];
for (size_t i = 0; i < num_clients; i++) { for (size_t i = 0; i < num_clients; i++) {
ClientData cd; clients[i].stub = std::move(Worker::NewStub(CreateChannel(
cd.stub = std::move(Worker::NewStub(CreateChannel(
workers[i + num_servers], InsecureCredentials(), ChannelArguments()))); workers[i + num_servers], InsecureCredentials(), ChannelArguments())));
ClientArgs args; ClientArgs args;
result_client_config = client_config; result_client_config = client_config;
result_client_config.set_host(workers[i + num_servers]); result_client_config.set_host(workers[i + num_servers]);
*args.mutable_setup() = client_config; *args.mutable_setup() = client_config;
cd.stream = std::move(cd.stub->RunTest(runsc::AllocContext(&contexts))); clients[i].stream = std::move(clients[i].stub->RunTest(runsc::AllocContext(&contexts)));
GPR_ASSERT(cd.stream->Write(args)); GPR_ASSERT(clients[i].stream->Write(args));
ClientStatus init_status; ClientStatus init_status;
GPR_ASSERT(cd.stream->Read(&init_status)); GPR_ASSERT(clients[i].stream->Read(&init_status));
clients.push_back(std::move(cd));
} }
// Let everything warmup // Let everything warmup
...@@ -206,18 +200,18 @@ std::unique_ptr<ScenarioResult> RunScenario( ...@@ -206,18 +200,18 @@ std::unique_ptr<ScenarioResult> RunScenario(
server_mark.mutable_mark(); server_mark.mutable_mark();
ClientArgs client_mark; ClientArgs client_mark;
client_mark.mutable_mark(); client_mark.mutable_mark();
for (auto server = servers.begin(); server != servers.end(); server++) { for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark)); GPR_ASSERT(server->stream->Write(server_mark));
} }
for (auto client = clients.begin(); client != clients.end(); client++) { for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark)); GPR_ASSERT(client->stream->Write(client_mark));
} }
ServerStatus server_status; ServerStatus server_status;
ClientStatus client_status; ClientStatus client_status;
for (auto server = servers.begin(); server != servers.end(); server++) { for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Read(&server_status)); GPR_ASSERT(server->stream->Read(&server_status));
} }
for (auto client = clients.begin(); client != clients.end(); client++) { for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status)); GPR_ASSERT(client->stream->Read(&client_status));
} }
...@@ -231,19 +225,19 @@ std::unique_ptr<ScenarioResult> RunScenario( ...@@ -231,19 +225,19 @@ std::unique_ptr<ScenarioResult> RunScenario(
result->client_config = result_client_config; result->client_config = result_client_config;
result->server_config = result_server_config; result->server_config = result_server_config;
gpr_log(GPR_INFO, "Finishing"); gpr_log(GPR_INFO, "Finishing");
for (auto server = servers.begin(); server != servers.end(); server++) { for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark)); GPR_ASSERT(server->stream->Write(server_mark));
} }
for (auto client = clients.begin(); client != clients.end(); client++) { for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark)); GPR_ASSERT(client->stream->Write(client_mark));
} }
for (auto server = servers.begin(); server != servers.end(); server++) { for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Read(&server_status)); GPR_ASSERT(server->stream->Read(&server_status));
const auto& stats = server_status.stats(); const auto& stats = server_status.stats();
result->server_resources.emplace_back( result->server_resources.emplace_back(
stats.time_elapsed(), stats.time_user(), stats.time_system()); stats.time_elapsed(), stats.time_user(), stats.time_system());
} }
for (auto client = clients.begin(); client != clients.end(); client++) { for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status)); GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats(); const auto& stats = client_status.stats();
result->latencies.MergeProto(stats.latencies()); result->latencies.MergeProto(stats.latencies());
...@@ -251,11 +245,11 @@ std::unique_ptr<ScenarioResult> RunScenario( ...@@ -251,11 +245,11 @@ std::unique_ptr<ScenarioResult> RunScenario(
stats.time_elapsed(), stats.time_user(), stats.time_system()); stats.time_elapsed(), stats.time_user(), stats.time_system());
} }
for (auto client = clients.begin(); client != clients.end(); client++) { for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->WritesDone()); GPR_ASSERT(client->stream->WritesDone());
GPR_ASSERT(client->stream->Finish().ok()); GPR_ASSERT(client->stream->Finish().ok());
} }
for (auto server = servers.begin(); server != servers.end(); server++) { for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->WritesDone()); GPR_ASSERT(server->stream->WritesDone());
GPR_ASSERT(server->stream->Finish().ok()); GPR_ASSERT(server->stream->Finish().ok());
} }
......
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <signal.h>
#include "test/cpp/qps/driver.h" #include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h" #include "test/cpp/qps/report.h"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment