Skip to content
Snippets Groups Projects
Commit a840de43 authored by Vijay Pai's avatar Vijay Pai
Browse files

Merge pull request #2283 from ctiller/make-it-a-bit-less-locky

Pluck some low hanging concurrency fruit
parents 7510e58b 27df2cf6
No related branches found
No related tags found
No related merge requests found
...@@ -64,7 +64,7 @@ namespace testing { ...@@ -64,7 +64,7 @@ namespace testing {
class AsyncQpsServerTest : public Server { class AsyncQpsServerTest : public Server {
public: public:
AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) { AsyncQpsServerTest(const ServerConfig &config, int port) {
char *server_address = NULL; char *server_address = NULL;
gpr_join_host_port(&server_address, "::", port); gpr_join_host_port(&server_address, "::", port);
...@@ -96,6 +96,9 @@ class AsyncQpsServerTest : public Server { ...@@ -96,6 +96,9 @@ class AsyncQpsServerTest : public Server {
request_streaming, ProcessRPC)); request_streaming, ProcessRPC));
} }
} }
for (int i = 0; i < config.threads(); i++) {
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
for (int i = 0; i < config.threads(); i++) { for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() { threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down // Wait until work is available or we are shutting down
...@@ -105,11 +108,9 @@ class AsyncQpsServerTest : public Server { ...@@ -105,11 +108,9 @@ class AsyncQpsServerTest : public Server {
ServerRpcContext *ctx = detag(got_tag); ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke // The tag is a pointer to an RPC context to invoke
bool still_going = ctx->RunNextState(ok); bool still_going = ctx->RunNextState(ok);
std::unique_lock<std::mutex> g(shutdown_mutex_); if (!shutdown_state_[i]->shutdown()) {
if (!shutdown_) {
// this RPC context is done, so refresh it // this RPC context is done, so refresh it
if (!still_going) { if (!still_going) {
g.unlock();
ctx->Reset(); ctx->Reset();
} }
} else { } else {
...@@ -122,9 +123,8 @@ class AsyncQpsServerTest : public Server { ...@@ -122,9 +123,8 @@ class AsyncQpsServerTest : public Server {
} }
~AsyncQpsServerTest() { ~AsyncQpsServerTest() {
server_->Shutdown(); server_->Shutdown();
{ for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
std::lock_guard<std::mutex> g(shutdown_mutex_); (*ss)->set_shutdown();
shutdown_ = true;
} }
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join(); thr->join();
...@@ -316,8 +316,25 @@ class AsyncQpsServerTest : public Server { ...@@ -316,8 +316,25 @@ class AsyncQpsServerTest : public Server {
TestService::AsyncService async_service_; TestService::AsyncService async_service_;
std::forward_list<ServerRpcContext *> contexts_; std::forward_list<ServerRpcContext *> contexts_;
std::mutex shutdown_mutex_; class PerThreadShutdownState {
bool shutdown_; public:
PerThreadShutdownState() : shutdown_(false) {}
bool shutdown() const {
std::lock_guard<std::mutex> lock(mutex_);
return shutdown_;
}
void set_shutdown() {
std::lock_guard<std::mutex> lock(mutex_);
shutdown_ = true;
}
private:
mutable std::mutex mutex_;
bool shutdown_;
};
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
}; };
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config, std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment