Skip to content
Snippets Groups Projects
Commit f993194b authored by vjpai's avatar vjpai
Browse files

Merge branch 'stream_ctx' into poisson

Conflicts:
	test/cpp/qps/client_sync.cc
parents 8dd7aab0 0fe99440
No related branches found
No related tags found
No related merge requests found
...@@ -68,12 +68,12 @@ class SynchronousClient : public Client { ...@@ -68,12 +68,12 @@ class SynchronousClient : public Client {
public: public:
SynchronousClient(const ClientConfig& config) : Client(config) { SynchronousClient(const ClientConfig& config) : Client(config) {
num_threads_ = num_threads_ =
config.outstanding_rpcs_per_channel() * config.client_channels(); config.outstanding_rpcs_per_channel() * config.client_channels();
responses_.resize(num_threads_); responses_.resize(num_threads_);
SetupLoadTest(config, num_threads_); SetupLoadTest(config, num_threads_);
} }
virtual ~SynchronousClient() {}; virtual ~SynchronousClient(){};
protected: protected:
void WaitToIssue(int thread_idx) { void WaitToIssue(int thread_idx) {
...@@ -89,9 +89,11 @@ class SynchronousClient : public Client { ...@@ -89,9 +89,11 @@ class SynchronousClient : public Client {
class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
public: public:
SynchronousUnaryClient(const ClientConfig& config): SynchronousUnaryClient(const ClientConfig& config)
SynchronousClient(config) {StartThreads(num_threads_);} : SynchronousClient(config) {
~SynchronousUnaryClient() {EndThreads();} StartThreads(num_threads_);
}
~SynchronousUnaryClient() { EndThreads(); }
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx); WaitToIssue(thread_idx);
...@@ -107,44 +109,47 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { ...@@ -107,44 +109,47 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
public: public:
SynchronousStreamingClient(const ClientConfig& config): SynchronousStreamingClient(const ClientConfig& config)
SynchronousClient(config) { : SynchronousClient(config), context_(num_threads_), stream_(num_threads_) {
for (size_t thread_idx=0;thread_idx<num_threads_;thread_idx++){ for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub(); auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_ = stub->StreamingCall(&context_); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
} }
StartThreads(num_threads_); StartThreads(num_threads_);
} }
~SynchronousStreamingClient() { ~SynchronousStreamingClient() {
EndThreads(); EndThreads();
if (stream_) { for (auto stream = stream_.begin(); stream != stream_.end(); stream++) {
SimpleResponse response; if (*stream) {
stream_->WritesDone(); (*stream)->WritesDone();
EXPECT_TRUE(stream_->Finish().IsOk()); EXPECT_TRUE((*stream)->Finish().IsOk());
}
} }
} }
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx); WaitToIssue(thread_idx);
double start = Timer::Now(); double start = Timer::Now();
if (stream_->Write(request_) && stream_->Read(&responses_[thread_idx])) { if (stream_[thread_idx]->Write(request_) &&
stream_[thread_idx]->Read(&responses_[thread_idx])) {
histogram->Add((Timer::Now() - start) * 1e9); histogram->Add((Timer::Now() - start) * 1e9);
return true; return true;
} }
return false; return false;
} }
private:
grpc::ClientContext context_; private:
std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest, std::vector<grpc::ClientContext> context_;
SimpleResponse>> stream_; std::vector<std::unique_ptr<grpc::ClientReaderWriter<
SimpleRequest, SimpleResponse>>> stream_;
}; };
std::unique_ptr<Client> std::unique_ptr<Client> CreateSynchronousUnaryClient(
CreateSynchronousUnaryClient(const ClientConfig& config) { const ClientConfig& config) {
return std::unique_ptr<Client>(new SynchronousUnaryClient(config)); return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
} }
std::unique_ptr<Client> std::unique_ptr<Client> CreateSynchronousStreamingClient(
CreateSynchronousStreamingClient(const ClientConfig& config) { const ClientConfig& config) {
return std::unique_ptr<Client>(new SynchronousStreamingClient(config)); return std::unique_ptr<Client>(new SynchronousStreamingClient(config));
} }
......
...@@ -135,6 +135,15 @@ int main(int argc, char** argv) { ...@@ -135,6 +135,15 @@ int main(int argc, char** argv) {
server_config.set_threads(FLAGS_server_threads); server_config.set_threads(FLAGS_server_threads);
server_config.set_enable_ssl(FLAGS_enable_ssl); server_config.set_enable_ssl(FLAGS_enable_ssl);
// If we're running a sync-server streaming test, make sure
// that we have at least as many threads as the active streams
// or else threads will be blocked from forward progress and the
// client will deadlock on a timer.
GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER &&
rpc_type == grpc::testing::STREAMING &&
FLAGS_server_threads < FLAGS_client_channels *
FLAGS_outstanding_rpcs_per_channel));
auto result = RunScenario(client_config, FLAGS_num_clients, auto result = RunScenario(client_config, FLAGS_num_clients,
server_config, FLAGS_num_servers, server_config, FLAGS_num_servers,
FLAGS_warmup_seconds, FLAGS_benchmark_seconds, FLAGS_warmup_seconds, FLAGS_benchmark_seconds,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment