Skip to content
Snippets Groups Projects
Commit 2bac1f8c authored by Craig Tiller's avatar Craig Tiller
Browse files

Merge pull request #1470 from vjpai/stream_ctx

Fix a bug in synchronous streaming QPS test
parents f0b4c587 0fe99440
No related branches found
No related tags found
No related merge requests found
......@@ -66,11 +66,11 @@ class SynchronousClient : public Client {
public:
SynchronousClient(const ClientConfig& config) : Client(config) {
num_threads_ =
config.outstanding_rpcs_per_channel() * config.client_channels();
config.outstanding_rpcs_per_channel() * config.client_channels();
responses_.resize(num_threads_);
}
virtual ~SynchronousClient() {};
virtual ~SynchronousClient(){};
protected:
size_t num_threads_;
......@@ -79,10 +79,12 @@ class SynchronousClient : public Client {
class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
public:
SynchronousUnaryClient(const ClientConfig& config):
SynchronousClient(config) {StartThreads(num_threads_);}
~SynchronousUnaryClient() {EndThreads();}
SynchronousUnaryClient(const ClientConfig& config)
: SynchronousClient(config) {
StartThreads(num_threads_);
}
~SynchronousUnaryClient() { EndThreads(); }
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
double start = Timer::Now();
......@@ -96,43 +98,46 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
public:
SynchronousStreamingClient(const ClientConfig& config):
SynchronousClient(config) {
for (size_t thread_idx=0;thread_idx<num_threads_;thread_idx++){
SynchronousStreamingClient(const ClientConfig& config)
: SynchronousClient(config), context_(num_threads_), stream_(num_threads_) {
for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_ = stub->StreamingCall(&context_);
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
}
StartThreads(num_threads_);
}
~SynchronousStreamingClient() {
EndThreads();
if (stream_) {
SimpleResponse response;
stream_->WritesDone();
EXPECT_TRUE(stream_->Finish().IsOk());
for (auto stream = stream_.begin(); stream != stream_.end(); stream++) {
if (*stream) {
(*stream)->WritesDone();
EXPECT_TRUE((*stream)->Finish().IsOk());
}
}
}
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
double start = Timer::Now();
if (stream_->Write(request_) && stream_->Read(&responses_[thread_idx])) {
if (stream_[thread_idx]->Write(request_) &&
stream_[thread_idx]->Read(&responses_[thread_idx])) {
histogram->Add((Timer::Now() - start) * 1e9);
return true;
}
return false;
}
private:
grpc::ClientContext context_;
std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest,
SimpleResponse>> stream_;
private:
std::vector<grpc::ClientContext> context_;
std::vector<std::unique_ptr<grpc::ClientReaderWriter<
SimpleRequest, SimpleResponse>>> stream_;
};
std::unique_ptr<Client>
CreateSynchronousUnaryClient(const ClientConfig& config) {
std::unique_ptr<Client> CreateSynchronousUnaryClient(
const ClientConfig& config) {
return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
}
std::unique_ptr<Client>
CreateSynchronousStreamingClient(const ClientConfig& config) {
std::unique_ptr<Client> CreateSynchronousStreamingClient(
const ClientConfig& config) {
return std::unique_ptr<Client>(new SynchronousStreamingClient(config));
}
......
......@@ -94,6 +94,15 @@ int main(int argc, char** argv) {
server_config.set_threads(FLAGS_server_threads);
server_config.set_enable_ssl(FLAGS_enable_ssl);
// If we're running a sync-server streaming test, make sure
// that we have at least as many threads as the active streams
// or else threads will be blocked from forward progress and the
// client will deadlock on a timer.
GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER &&
rpc_type == grpc::testing::STREAMING &&
FLAGS_server_threads < FLAGS_client_channels *
FLAGS_outstanding_rpcs_per_channel));
auto result = RunScenario(client_config, FLAGS_num_clients,
server_config, FLAGS_num_servers,
FLAGS_warmup_seconds, FLAGS_benchmark_seconds,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment