diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto
index 20496a8116b6090171ab67d4c313003f00a7cd68..ece691081582b5cb307c6bd4a26d622950e6a18b 100644
--- a/src/proto/grpc/testing/control.proto
+++ b/src/proto/grpc/testing/control.proto
@@ -229,4 +229,7 @@ message ScenarioResult {
   repeated int32 server_cores = 5;
   // An after-the-fact computed summary
   ScenarioResultSummary summary = 6;
+  // Information on success or failure of each worker
+  repeated bool client_success = 7;
+  repeated bool server_success = 8;
 }
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 047bd16408224a9e21c1873e5cc98f2bdf50b243..4045e13460f56bb517c2995565093ad7598fb368 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -112,6 +112,21 @@ class ClientRequestCreator<ByteBuffer> {
   }
 };
 
+class HistogramEntry GRPC_FINAL {
+ public:
+  HistogramEntry() : used_(false) {}
+  bool used() const { return used_; }
+  double value() const { return value_; }
+  void set_value(double v) {
+    used_ = true;
+    value_ = v;
+  }
+
+ private:
+  bool used_;
+  double value_;
+};
+
 class Client {
  public:
   Client() : timer_(new UsageTimer), interarrival_timer_() {}
@@ -151,10 +166,21 @@ class Client {
     return stats;
   }
 
+  // Must call AwaitThreadsCompletion before destructor to avoid a race
+  // between destructor and invocation of virtual ThreadFunc
+  void AwaitThreadsCompletion() {
+    DestroyMultithreading();
+    std::unique_lock<std::mutex> g(thread_completion_mu_);
+    while (threads_remaining_ != 0) {
+      threads_complete_.wait(g);
+    }
+  }
+
  protected:
   bool closed_loop_;
 
   void StartThreads(size_t num_threads) {
+    threads_remaining_ = num_threads;
     for (size_t i = 0; i < num_threads; i++) {
       threads_.emplace_back(new Thread(this, i));
     }
@@ -162,7 +188,8 @@ class Client {
 
   void EndThreads() { threads_.clear(); }
 
-  virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
+  virtual void DestroyMultithreading() = 0;
+  virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
 
   void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
     // Set up the load distribution based on the number of threads
@@ -215,7 +242,6 @@ class Client {
    public:
     Thread(Client* client, size_t idx)
         : done_(false),
-          new_stats_(nullptr),
           client_(client),
           idx_(idx),
           impl_(&Thread::ThreadFunc, this) {}
@@ -230,15 +256,10 @@ class Client {
 
     void BeginSwap(Histogram* n) {
       std::lock_guard<std::mutex> g(mu_);
-      new_stats_ = n;
+      n->Swap(&histogram_);
     }
 
-    void EndSwap() {
-      std::unique_lock<std::mutex> g(mu_);
-      while (new_stats_ != nullptr) {
-        cv_.wait(g);
-      };
-    }
+    void EndSwap() {}
 
     void MergeStatsInto(Histogram* hist) {
       std::unique_lock<std::mutex> g(mu_);
@@ -252,29 +273,26 @@ class Client {
     void ThreadFunc() {
       for (;;) {
         // run the loop body
-        const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
-        // lock, see if we're done
+        HistogramEntry entry;
+        const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
+        // lock, update histogram if needed and see if we're done
         std::lock_guard<std::mutex> g(mu_);
+        if (entry.used()) {
+          histogram_.Add(entry.value());
+        }
         if (!thread_still_ok) {
           gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
           done_ = true;
         }
         if (done_) {
+          client_->CompleteThread();
           return;
         }
-        // check if we're resetting stats, swap out the histogram if so
-        if (new_stats_) {
-          new_stats_->Swap(&histogram_);
-          new_stats_ = nullptr;
-          cv_.notify_one();
-        }
       }
     }
 
     std::mutex mu_;
-    std::condition_variable cv_;
     bool done_;
-    Histogram* new_stats_;
     Histogram histogram_;
     Client* client_;
     const size_t idx_;
@@ -286,6 +304,18 @@ class Client {
 
   InterarrivalTimer interarrival_timer_;
   std::vector<gpr_timespec> next_time_;
+
+  std::mutex thread_completion_mu_;
+  size_t threads_remaining_;
+  std::condition_variable threads_complete_;
+
+  void CompleteThread() {
+    std::lock_guard<std::mutex> g(thread_completion_mu_);
+    threads_remaining_--;
+    if (threads_remaining_ == 0) {
+      threads_complete_.notify_all();
+    }
+  }
 };
 
 template <class StubType, class RequestType>
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 5dd0bd85339150c37f302a5fca15bdaad61de8fd..5d9cb4bd0cfc35eeecf9db4310a5e6115e70ec86 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -31,7 +31,6 @@
  *
  */
 
-#include <cassert>
 #include <forward_list>
 #include <functional>
 #include <list>
@@ -48,7 +47,6 @@
 #include <grpc++/generic/generic_stub.h>
 #include <grpc/grpc.h>
 #include <grpc/support/cpu.h>
-#include <grpc/support/histogram.h>
 #include <grpc/support/log.h>
 
 #include "src/proto/grpc/testing/services.grpc.pb.h"
@@ -64,7 +62,7 @@ class ClientRpcContext {
   ClientRpcContext() {}
   virtual ~ClientRpcContext() {}
   // next state, return false if done. Collect stats when appropriate
-  virtual bool RunNextState(bool, Histogram* hist) = 0;
+  virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
   virtual ClientRpcContext* StartNewClone() = 0;
   static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
   static ClientRpcContext* detag(void* t) {
@@ -104,7 +102,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
       alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
     }
   }
-  bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+  bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
     switch (next_state_) {
       case State::READY:
         start_ = UsageTimer::Now();
@@ -114,7 +112,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
         next_state_ = State::RESP_DONE;
         return true;
       case State::RESP_DONE:
-        hist->Add((UsageTimer::Now() - start_) * 1e9);
+        entry->set_value((UsageTimer::Now() - start_) * 1e9);
         callback_(status_, &response_);
         next_state_ = State::INVALID;
         return false;
@@ -176,6 +174,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
     for (int i = 0; i < num_async_threads_; i++) {
       cli_cqs_.emplace_back(new CompletionQueue);
       next_issuers_.emplace_back(NextIssuer(i));
+      shutdown_state_.emplace_back(new PerThreadShutdownState());
     }
 
     using namespace std::placeholders;
@@ -192,7 +191,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
   }
   virtual ~AsyncClient() {
     for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
-      (*cq)->Shutdown();
       void* got_tag;
       bool ok;
       while ((*cq)->Next(&got_tag, &ok)) {
@@ -201,7 +199,36 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
     }
   }
 
-  bool ThreadFunc(Histogram* histogram,
+ protected:
+  const int num_async_threads_;
+
+ private:
+  struct PerThreadShutdownState {
+    mutable std::mutex mutex;
+    bool shutdown;
+    PerThreadShutdownState() : shutdown(false) {}
+  };
+
+  int NumThreads(const ClientConfig& config) {
+    int num_threads = config.async_client_threads();
+    if (num_threads <= 0) {  // Use dynamic sizing
+      num_threads = cores_;
+      gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
+    }
+    return num_threads;
+  }
+  void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL {
+    for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
+      std::lock_guard<std::mutex> lock((*ss)->mutex);
+      (*ss)->shutdown = true;
+    }
+    for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
+      (*cq)->Shutdown();
+    }
+    this->EndThreads();  // this needed for resolution
+  }
+
+  bool ThreadFunc(HistogramEntry* entry,
                   size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
     void* got_tag;
     bool ok;
@@ -209,12 +236,15 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
     switch (cli_cqs_[thread_idx]->AsyncNext(
         &got_tag, &ok,
         std::chrono::system_clock::now() + std::chrono::milliseconds(10))) {
-      case CompletionQueue::SHUTDOWN:
-        return false;
       case CompletionQueue::GOT_EVENT: {
         // Got a regular event, so process it
         ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
-        if (!ctx->RunNextState(ok, histogram)) {
+        // Proceed while holding a lock to make sure that
+        // this thread isn't supposed to shut down
+        std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+        if (shutdown_state_[thread_idx]->shutdown) {
+          return true;
+        } else if (!ctx->RunNextState(ok, entry)) {
           // The RPC and callback are done, so clone the ctx
           // and kickstart the new one
           auto clone = ctx->StartNewClone();
@@ -224,29 +254,23 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
         }
         return true;
       }
-      case CompletionQueue::TIMEOUT:
-        // TODO(ctiller): do something here to track how frequently we pass
-        // through this codepath.
+      case CompletionQueue::TIMEOUT: {
+        std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+        if (shutdown_state_[thread_idx]->shutdown) {
+          return true;
+        }
+        return true;
+      }
+      case CompletionQueue::SHUTDOWN:  // queue is shutting down, so we must be
+                                       // done
         return true;
     }
-    GPR_UNREACHABLE_CODE(return false);
-  }
-
- protected:
-  const int num_async_threads_;
-
- private:
-  int NumThreads(const ClientConfig& config) {
-    int num_threads = config.async_client_threads();
-    if (num_threads <= 0) {  // Use dynamic sizing
-      num_threads = cores_;
-      gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
-    }
-    return num_threads;
+    GPR_UNREACHABLE_CODE(return true);
   }
 
   std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
   std::vector<std::function<gpr_timespec()>> next_issuers_;
+  std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
 };
 
 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
@@ -262,7 +286,7 @@ class AsyncUnaryClient GRPC_FINAL
             config, SetupCtx, BenchmarkStubCreator) {
     StartThreads(num_async_threads_);
   }
-  ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
+  ~AsyncUnaryClient() GRPC_OVERRIDE {}
 
  private:
   static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -307,7 +331,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
     stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
     next_state_ = State::STREAM_IDLE;
   }
-  bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+  bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
     while (true) {
       switch (next_state_) {
         case State::STREAM_IDLE:
@@ -339,7 +363,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
           return true;
           break;
         case State::READ_DONE:
-          hist->Add((UsageTimer::Now() - start_) * 1e9);
+          entry->set_value((UsageTimer::Now() - start_) * 1e9);
           callback_(status_, &response_);
           next_state_ = State::STREAM_IDLE;
           break;  // loop around
@@ -391,7 +415,7 @@ class AsyncStreamingClient GRPC_FINAL
     StartThreads(num_async_threads_);
   }
 
-  ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+  ~AsyncStreamingClient() GRPC_OVERRIDE {}
 
  private:
   static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -439,7 +463,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
                          ClientRpcContext::tag(this));
     next_state_ = State::STREAM_IDLE;
   }
-  bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+  bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
     while (true) {
       switch (next_state_) {
         case State::STREAM_IDLE:
@@ -471,7 +495,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
           return true;
           break;
         case State::READ_DONE:
-          hist->Add((UsageTimer::Now() - start_) * 1e9);
+          entry->set_value((UsageTimer::Now() - start_) * 1e9);
           callback_(status_, &response_);
           next_state_ = State::STREAM_IDLE;
           break;  // loop around
@@ -527,7 +551,7 @@ class GenericAsyncStreamingClient GRPC_FINAL
     StartThreads(num_async_threads_);
   }
 
-  ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+  ~GenericAsyncStreamingClient() GRPC_OVERRIDE {}
 
  private:
   static void CheckDone(grpc::Status s, ByteBuffer* response) {}
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index c88e95b80e5dabcf3aec7c15b6c7ab9861b4c30f..25c78235532625280fb18fc51980b7a4d4c7fc38 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -31,7 +31,6 @@
  *
  */
 
-#include <cassert>
 #include <chrono>
 #include <memory>
 #include <mutex>
@@ -46,7 +45,6 @@
 #include <grpc++/server_builder.h>
 #include <grpc/grpc.h>
 #include <grpc/support/alloc.h>
-#include <grpc/support/histogram.h>
 #include <grpc/support/host_port.h>
 #include <grpc/support/log.h>
 #include <grpc/support/time.h>
@@ -55,7 +53,6 @@
 #include "src/core/lib/profiling/timers.h"
 #include "src/proto/grpc/testing/services.grpc.pb.h"
 #include "test/cpp/qps/client.h"
-#include "test/cpp/qps/histogram.h"
 #include "test/cpp/qps/interarrival.h"
 #include "test/cpp/qps/usage_timer.h"
 
@@ -90,6 +87,9 @@ class SynchronousClient
 
   size_t num_threads_;
   std::vector<SimpleResponse> responses_;
+
+ private:
+  void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); }
 };
 
 class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
@@ -98,9 +98,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
       : SynchronousClient(config) {
     StartThreads(num_threads_);
   }
-  ~SynchronousUnaryClient() { EndThreads(); }
+  ~SynchronousUnaryClient() {}
 
-  bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
+  bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
     WaitToIssue(thread_idx);
     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
     double start = UsageTimer::Now();
@@ -108,7 +108,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
     grpc::ClientContext context;
     grpc::Status s =
         stub->UnaryCall(&context, request_, &responses_[thread_idx]);
-    histogram->Add((UsageTimer::Now() - start) * 1e9);
+    entry->set_value((UsageTimer::Now() - start) * 1e9);
     return s.ok();
   }
 };
@@ -127,25 +127,29 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
     StartThreads(num_threads_);
   }
   ~SynchronousStreamingClient() {
-    EndThreads();
-    for (auto stream = &stream_[0]; stream != &stream_[num_threads_];
-         stream++) {
+    for (size_t i = 0; i < num_threads_; i++) {
+      auto stream = &stream_[i];
       if (*stream) {
         (*stream)->WritesDone();
-        EXPECT_TRUE((*stream)->Finish().ok());
+        Status s = (*stream)->Finish();
+        EXPECT_TRUE(s.ok());
+        if (!s.ok()) {
+          gpr_log(GPR_ERROR, "Stream %zu received an error %s", i,
+                  s.error_message().c_str());
+        }
       }
     }
     delete[] stream_;
     delete[] context_;
   }
 
-  bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
+  bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
     WaitToIssue(thread_idx);
     GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0);
     double start = UsageTimer::Now();
     if (stream_[thread_idx]->Write(request_) &&
         stream_[thread_idx]->Read(&responses_[thread_idx])) {
-      histogram->Add((UsageTimer::Now() - start) * 1e9);
+      entry->set_value((UsageTimer::Now() - start) * 1e9);
       return true;
     }
     return false;
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 08bf0458832e379a4f9ae9ff7411b9716e00bd82..2aeaea51f2540d0df94b93ef7fe3edce6ac36899 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -87,7 +87,7 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
       CoreRequest dummy;
       CoreResponse cores;
       grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
-      assert(s.ok());
+      GPR_ASSERT(s.ok());
       std::deque<int> dq;
       for (int i = 0; i < cores.cores(); i++) {
         dq.push_back(i);
@@ -289,9 +289,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
     *args.mutable_setup() = server_config;
     servers[i].stream =
         servers[i].stub->RunServer(runsc::AllocContext(&contexts));
-    GPR_ASSERT(servers[i].stream->Write(args));
+    if (!servers[i].stream->Write(args)) {
+      gpr_log(GPR_ERROR, "Could not write args to server %zu", i);
+    }
     ServerStatus init_status;
-    GPR_ASSERT(servers[i].stream->Read(&init_status));
+    if (!servers[i].stream->Read(&init_status)) {
+      gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
+    }
     gpr_join_host_port(&cli_target, host, init_status.port());
     client_config.add_server_targets(cli_target);
     gpr_free(host);
@@ -345,9 +349,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
     *args.mutable_setup() = per_client_config;
     clients[i].stream =
         clients[i].stub->RunClient(runsc::AllocContext(&contexts));
-    GPR_ASSERT(clients[i].stream->Write(args));
+    if (!clients[i].stream->Write(args)) {
+      gpr_log(GPR_ERROR, "Could not write args to client %zu", i);
+    }
     ClientStatus init_status;
-    GPR_ASSERT(clients[i].stream->Read(&init_status));
+    if (!clients[i].stream->Read(&init_status)) {
+      gpr_log(GPR_ERROR, "Client %zu did not yield initial status", i);
+    }
   }
 
   // Let everything warmup
@@ -362,19 +370,31 @@ std::unique_ptr<ScenarioResult> RunScenario(
   server_mark.mutable_mark()->set_reset(true);
   ClientArgs client_mark;
   client_mark.mutable_mark()->set_reset(true);
-  for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
-    GPR_ASSERT(server->stream->Write(server_mark));
+  for (size_t i = 0; i < num_servers; i++) {
+    auto server = &servers[i];
+    if (!server->stream->Write(server_mark)) {
+      gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
+    }
   }
-  for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
-    GPR_ASSERT(client->stream->Write(client_mark));
+  for (size_t i = 0; i < num_clients; i++) {
+    auto client = &clients[i];
+    if (!client->stream->Write(client_mark)) {
+      gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
+    }
   }
   ServerStatus server_status;
   ClientStatus client_status;
-  for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
-    GPR_ASSERT(server->stream->Read(&server_status));
+  for (size_t i = 0; i < num_servers; i++) {
+    auto server = &servers[i];
+    if (!server->stream->Read(&server_status)) {
+      gpr_log(GPR_ERROR, "Couldn't get status from server %zu", i);
+    }
   }
-  for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
-    GPR_ASSERT(client->stream->Read(&client_status));
+  for (size_t i = 0; i < num_clients; i++) {
+    auto client = &clients[i];
+    if (!client->stream->Read(&client_status)) {
+      gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i);
+    }
   }
 
   // Wait some time
@@ -390,37 +410,73 @@ std::unique_ptr<ScenarioResult> RunScenario(
   Histogram merged_latencies;
 
   gpr_log(GPR_INFO, "Finishing clients");
-  for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
-    GPR_ASSERT(client->stream->Write(client_mark));
-    GPR_ASSERT(client->stream->WritesDone());
+  for (size_t i = 0; i < num_clients; i++) {
+    auto client = &clients[i];
+    if (!client->stream->Write(client_mark)) {
+      gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
+    }
+    if (!client->stream->WritesDone()) {
+      gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i);
+    }
   }
-  for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
-    GPR_ASSERT(client->stream->Read(&client_status));
-    const auto& stats = client_status.stats();
-    merged_latencies.MergeProto(stats.latencies());
-    result->add_client_stats()->CopyFrom(stats);
-    GPR_ASSERT(!client->stream->Read(&client_status));
+  for (size_t i = 0; i < num_clients; i++) {
+    auto client = &clients[i];
+    // Read the client final status
+    if (client->stream->Read(&client_status)) {
+      gpr_log(GPR_INFO, "Received final status from client %zu", i);
+      const auto& stats = client_status.stats();
+      merged_latencies.MergeProto(stats.latencies());
+      result->add_client_stats()->CopyFrom(stats);
+      // That final status should be the last message on the client stream
+      GPR_ASSERT(!client->stream->Read(&client_status));
+    } else {
+      gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i);
+    }
   }
-  for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
-    GPR_ASSERT(client->stream->Finish().ok());
+  for (size_t i = 0; i < num_clients; i++) {
+    auto client = &clients[i];
+    Status s = client->stream->Finish();
+    result->add_client_success(s.ok());
+    if (!s.ok()) {
+      gpr_log(GPR_ERROR, "Client %zu had an error %s", i,
+              s.error_message().c_str());
+    }
   }
   delete[] clients;
 
   merged_latencies.FillProto(result->mutable_latencies());
 
   gpr_log(GPR_INFO, "Finishing servers");
-  for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
-    GPR_ASSERT(server->stream->Write(server_mark));
-    GPR_ASSERT(server->stream->WritesDone());
+  for (size_t i = 0; i < num_servers; i++) {
+    auto server = &servers[i];
+    if (!server->stream->Write(server_mark)) {
+      gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
+    }
+    if (!server->stream->WritesDone()) {
+      gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
+    }
   }
-  for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
-    GPR_ASSERT(server->stream->Read(&server_status));
-    result->add_server_stats()->CopyFrom(server_status.stats());
-    result->add_server_cores(server_status.cores());
-    GPR_ASSERT(!server->stream->Read(&server_status));
+  for (size_t i = 0; i < num_servers; i++) {
+    auto server = &servers[i];
+    // Read the server final status
+    if (server->stream->Read(&server_status)) {
+      gpr_log(GPR_INFO, "Received final status from server %zu", i);
+      result->add_server_stats()->CopyFrom(server_status.stats());
+      result->add_server_cores(server_status.cores());
+      // That final status should be the last message on the server stream
+      GPR_ASSERT(!server->stream->Read(&server_status));
+    } else {
+      gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i);
+    }
   }
-  for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
-    GPR_ASSERT(server->stream->Finish().ok());
+  for (size_t i = 0; i < num_servers; i++) {
+    auto server = &servers[i];
+    Status s = server->stream->Finish();
+    result->add_server_success(s.ok());
+    if (!s.ok()) {
+      gpr_log(GPR_ERROR, "Server %zu had an error %s", i,
+              s.error_message().c_str());
+    }
   }
 
   delete[] servers;
@@ -429,8 +485,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
   return result;
 }
 
-void RunQuit() {
+bool RunQuit() {
   // Get client, server lists
+  bool result = true;
   auto workers = get_workers("QPS_WORKERS");
   for (size_t i = 0; i < workers.size(); i++) {
     auto stub = WorkerService::NewStub(
@@ -438,8 +495,14 @@ void RunQuit() {
     Void dummy;
     grpc::ClientContext ctx;
     ctx.set_fail_fast(false);
-    GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok());
+    Status s = stub->QuitWorker(&ctx, dummy, &dummy);
+    if (!s.ok()) {
+      gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i,
+              s.error_message().c_str());
+      result = false;
+    }
   }
+  return result;
 }
 
 }  // namespace testing
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index 3a5cf138f11c3e2284af7edf2d08bd677a2133d7..93f4370cafa51787051e509f50b1100dd0c1dd6f 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -47,7 +47,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
     const grpc::testing::ServerConfig& server_config, size_t num_servers,
     int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count);
 
-void RunQuit();
+bool RunQuit();
 }  // namespace testing
 }  // namespace grpc
 
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index f5d739f893a45b7fb5a17d0fd0ebe5306467505b..1524ebbc38957756af3aad4def1c0e8d8eae4b2e 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -53,7 +53,7 @@ DEFINE_bool(quit, false, "Quit the workers");
 namespace grpc {
 namespace testing {
 
-static void QpsDriver() {
+static bool QpsDriver() {
   grpc::string json;
 
   bool scfile = (FLAGS_scenarios_file != "");
@@ -81,13 +81,13 @@ static void QpsDriver() {
   } else if (scjson) {
     json = FLAGS_scenarios_json.c_str();
   } else if (FLAGS_quit) {
-    RunQuit();
-    return;
+    return RunQuit();
   }
 
   // Parse into an array of scenarios
   Scenarios scenarios;
   ParseJson(json.c_str(), "grpc.testing.Scenarios", &scenarios);
+  bool success = true;
 
   // Make sure that there is at least some valid scenario here
   GPR_ASSERT(scenarios.scenarios_size() > 0);
@@ -109,7 +109,15 @@ static void QpsDriver() {
     GetReporter()->ReportQPSPerCore(*result);
     GetReporter()->ReportLatency(*result);
     GetReporter()->ReportTimes(*result);
+
+    for (int i = 0; success && i < result->client_success_size(); i++) {
+      success = result->client_success(i);
+    }
+    for (int i = 0; success && i < result->server_success_size(); i++) {
+      success = result->server_success(i);
+    }
   }
+  return success;
 }
 
 }  // namespace testing
@@ -118,7 +126,7 @@ static void QpsDriver() {
 int main(int argc, char **argv) {
   grpc::testing::InitBenchmark(&argc, &argv, true);
 
-  grpc::testing::QpsDriver();
+  bool ok = grpc::testing::QpsDriver();
 
-  return 0;
+  return ok ? 0 : 1;
 }
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index f514e23e85404f933ed770524727dd18b6089549..d3e53fe14a63af8e1ee758af9f0d2211c50ee01d 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -33,7 +33,6 @@
 
 #include "test/cpp/qps/qps_worker.h"
 
-#include <cassert>
 #include <memory>
 #include <mutex>
 #include <sstream>
@@ -124,11 +123,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
       GRPC_OVERRIDE {
     InstanceGuard g(this);
     if (!g.Acquired()) {
-      return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+      return Status(StatusCode::RESOURCE_EXHAUSTED, "Client worker busy");
     }
 
     ScopedProfile profile("qps_client.prof", false);
     Status ret = RunClientBody(ctx, stream);
+    gpr_log(GPR_INFO, "RunClient: Returning");
     return ret;
   }
 
@@ -137,11 +137,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
       GRPC_OVERRIDE {
     InstanceGuard g(this);
     if (!g.Acquired()) {
-      return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+      return Status(StatusCode::RESOURCE_EXHAUSTED, "Server worker busy");
     }
 
     ScopedProfile profile("qps_server.prof", false);
     Status ret = RunServerBody(ctx, stream);
+    gpr_log(GPR_INFO, "RunServer: Returning");
     return ret;
   }
 
@@ -154,7 +155,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
   Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE {
     InstanceGuard g(this);
     if (!g.Acquired()) {
-      return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+      return Status(StatusCode::RESOURCE_EXHAUSTED, "Quitting worker busy");
     }
 
     worker_->MarkDone();
@@ -197,33 +198,38 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
                        ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
     ClientArgs args;
     if (!stream->Read(&args)) {
-      return Status(StatusCode::INVALID_ARGUMENT, "");
+      return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read args");
     }
     if (!args.has_setup()) {
-      return Status(StatusCode::INVALID_ARGUMENT, "");
+      return Status(StatusCode::INVALID_ARGUMENT, "Invalid setup arg");
     }
     gpr_log(GPR_INFO, "RunClientBody: about to create client");
     auto client = CreateClient(args.setup());
     if (!client) {
-      return Status(StatusCode::INVALID_ARGUMENT, "");
+      return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create client");
     }
     gpr_log(GPR_INFO, "RunClientBody: client created");
     ClientStatus status;
     if (!stream->Write(status)) {
-      return Status(StatusCode::UNKNOWN, "");
+      return Status(StatusCode::UNKNOWN, "Client couldn't report init status");
     }
     gpr_log(GPR_INFO, "RunClientBody: creation status reported");
     while (stream->Read(&args)) {
       gpr_log(GPR_INFO, "RunClientBody: Message read");
       if (!args.has_mark()) {
         gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!");
-        return Status(StatusCode::INVALID_ARGUMENT, "");
+        return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
       }
       *status.mutable_stats() = client->Mark(args.mark().reset());
-      stream->Write(status);
+      if (!stream->Write(status)) {
+        return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark");
+      }
       gpr_log(GPR_INFO, "RunClientBody: Mark response given");
     }
 
+    gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion");
+    client->AwaitThreadsCompletion();
+
     gpr_log(GPR_INFO, "RunClientBody: Returning");
     return Status::OK;
   }
@@ -232,10 +238,10 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
                        ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
     ServerArgs args;
     if (!stream->Read(&args)) {
-      return Status(StatusCode::INVALID_ARGUMENT, "");
+      return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read server args");
     }
     if (!args.has_setup()) {
-      return Status(StatusCode::INVALID_ARGUMENT, "");
+      return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args");
     }
     if (server_port_ != 0) {
       args.mutable_setup()->set_port(server_port_);
@@ -243,24 +249,26 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
     gpr_log(GPR_INFO, "RunServerBody: about to create server");
     auto server = CreateServer(args.setup());
     if (!server) {
-      return Status(StatusCode::INVALID_ARGUMENT, "");
+      return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server");
     }
     gpr_log(GPR_INFO, "RunServerBody: server created");
     ServerStatus status;
     status.set_port(server->port());
     status.set_cores(server->cores());
     if (!stream->Write(status)) {
-      return Status(StatusCode::UNKNOWN, "");
+      return Status(StatusCode::UNKNOWN, "Server couldn't report init status");
     }
     gpr_log(GPR_INFO, "RunServerBody: creation status reported");
     while (stream->Read(&args)) {
       gpr_log(GPR_INFO, "RunServerBody: Message read");
       if (!args.has_mark()) {
         gpr_log(GPR_INFO, "RunServerBody: Message not a mark!");
-        return Status(StatusCode::INVALID_ARGUMENT, "");
+        return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
       }
       *status.mutable_stats() = server->Mark(args.mark().reset());
-      stream->Write(status);
+      if (!stream->Write(status)) {
+        return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark");
+      }
       gpr_log(GPR_INFO, "RunServerBody: Mark response given");
     }
 
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 73ca19148b56c320988d3676164241f7a0e5961d..dea87463312b56d817923e515590c85061874622 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -123,22 +123,24 @@ class AsyncQpsServerTest : public Server {
 
     for (int i = 0; i < num_threads; i++) {
       shutdown_state_.emplace_back(new PerThreadShutdownState());
-    }
-    for (int i = 0; i < num_threads; i++) {
       threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
     }
   }
   ~AsyncQpsServerTest() {
     for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
-      (*ss)->set_shutdown();
+      std::lock_guard<std::mutex> lock((*ss)->mutex);
+      (*ss)->shutdown = true;
+    }
+    // TODO (vpai): Remove this deadline and allow Shutdown to finish properly
+    auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3);
+    server_->Shutdown(deadline);
+    for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
+      (*cq)->Shutdown();
     }
-    server_->Shutdown(std::chrono::system_clock::now() +
-                      std::chrono::seconds(3));
     for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
       thr->join();
     }
     for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
-      (*cq)->Shutdown();
       bool ok;
       void *got_tag;
       while ((*cq)->Next(&got_tag, &ok))
@@ -151,22 +153,24 @@ class AsyncQpsServerTest : public Server {
   }
 
  private:
-  void ThreadFunc(int rank) {
+  void ThreadFunc(int thread_idx) {
     // Wait until work is available or we are shutting down
     bool ok;
     void *got_tag;
-    while (srv_cqs_[rank]->Next(&got_tag, &ok)) {
+    while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) {
       ServerRpcContext *ctx = detag(got_tag);
       // The tag is a pointer to an RPC context to invoke
-      const bool still_going = ctx->RunNextState(ok);
-      if (!shutdown_state_[rank]->shutdown()) {
-        // this RPC context is done, so refresh it
-        if (!still_going) {
-          ctx->Reset();
-        }
-      } else {
+      // Proceed while holding a lock to make sure that
+      // this thread isn't supposed to shut down
+      std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+      if (shutdown_state_[thread_idx]->shutdown) {
         return;
       }
+      const bool still_going = ctx->RunNextState(ok);
+      // if this RPC context is done, refresh it
+      if (!still_going) {
+        ctx->Reset();
+      }
     }
     return;
   }
@@ -334,24 +338,12 @@ class AsyncQpsServerTest : public Server {
   ServiceType async_service_;
   std::forward_list<ServerRpcContext *> contexts_;
 
-  class PerThreadShutdownState {
-   public:
-    PerThreadShutdownState() : shutdown_(false) {}
-
-    bool shutdown() const {
-      std::lock_guard<std::mutex> lock(mutex_);
-      return shutdown_;
-    }
-
-    void set_shutdown() {
-      std::lock_guard<std::mutex> lock(mutex_);
-      shutdown_ = true;
-    }
-
-   private:
-    mutable std::mutex mutex_;
-    bool shutdown_;
+  struct PerThreadShutdownState {
+    mutable std::mutex mutex;
+    bool shutdown;
+    PerThreadShutdownState() : shutdown(false) {}
   };
+
   std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
 };