diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index 9918fbdcb4fe3e37c8aa09fbc7344c89f4746666..91fa917661c8527bfc8ec98287fa04f6b71de66b 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -174,6 +174,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
              GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
   holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
   if (holder->connected_subchannel == NULL) {
+    gpr_atm_no_barrier_store(&holder->subchannel_call, 1);
     fail_locked(exec_ctx, holder);
   } else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) {
     /* already cancelled before subchannel became ready */
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index abe9d4a2e628f74ae0ff0ccf985539e76558e4e5..777a1c8c500d9487b221b24142cd6b21a87ec52c 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -181,13 +181,14 @@ namespace Grpc.Core.Internal.Tests
         }
 
         [Test]
-        public void ClientStreaming_WriteFailure()
+        public void ClientStreaming_WriteCompletionFailure()
         {
             var resultTask = asyncCall.ClientStreamingCallAsync();
             var requestStream = new ClientRequestStream<string, string>(asyncCall);
 
             var writeTask = requestStream.WriteAsync("request1");
             fakeCall.SendCompletionHandler(false);
+            // TODO: maybe IOException or waiting for RPCException is more appropriate here.
             Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
 
             fakeCall.UnaryResponseClientHandler(true,
@@ -199,7 +200,7 @@ namespace Grpc.Core.Internal.Tests
         }
 
         [Test]
-        public void ClientStreaming_WriteAfterReceivingStatusFails()
+        public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException()
         {
             var resultTask = asyncCall.ClientStreamingCallAsync();
             var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -210,7 +211,44 @@ namespace Grpc.Core.Internal.Tests
                 new Metadata());
 
             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
+            var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+            Assert.AreEqual(Status.DefaultSuccess, ex.Status);
+        }
+
+        [Test]
+        public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2()
+        {
+            var resultTask = asyncCall.ClientStreamingCallAsync();
+            var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+            fakeCall.UnaryResponseClientHandler(true,
+                new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
+                CreateResponsePayload(),
+                new Metadata());
+
+            AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
+            var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+            Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
+        }
+
+        [Test]
+        public void ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException()
+        {
+            var resultTask = asyncCall.ClientStreamingCallAsync();
+            var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+            requestStream.CompleteAsync();
+
             Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
+
+            fakeCall.SendCompletionHandler(true);
+
+            fakeCall.UnaryResponseClientHandler(true,
+                new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
+                CreateResponsePayload(),
+                new Metadata());
+
+            AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
         }
 
         [Test]
@@ -229,7 +267,7 @@ namespace Grpc.Core.Internal.Tests
         }
 
         [Test]
-        public void ClientStreaming_WriteAfterCancellationRequestFails()
+        public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
         {
             var resultTask = asyncCall.ClientStreamingCallAsync();
             var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -340,7 +378,7 @@ namespace Grpc.Core.Internal.Tests
         }
 
         [Test]
-        public void DuplexStreaming_WriteAfterReceivingStatusFails()
+        public void DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException()
         {
             asyncCall.StartDuplexStreamingCall();
             var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -352,7 +390,8 @@ namespace Grpc.Core.Internal.Tests
 
             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
 
-            Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await requestStream.WriteAsync("request1"));
+            var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1"));
+            Assert.AreEqual(Status.DefaultSuccess, ex.Status);
         }
 
         [Test]
@@ -372,7 +411,7 @@ namespace Grpc.Core.Internal.Tests
         }
 
         [Test]
-        public void DuplexStreaming_WriteAfterCancellationRequestFails()
+        public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
         {
             asyncCall.StartDuplexStreamingCall();
             var requestStream = new ClientRequestStream<string, string>(asyncCall);
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index f522174bd0f9fe4748fd3b0fcb42e99ec1e39631..55351869b5c9948b76b65d71ee215efc9d2c8640 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -57,7 +57,7 @@ namespace Grpc.Core.Internal
         // Completion of a pending unary response if not null.
         TaskCompletionSource<TResponse> unaryResponseTcs;
 
-        // Indicates that steaming call has finished.
+        // Indicates that response streaming call has finished.
         TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
 
         // Response headers set here once received.
@@ -443,6 +443,19 @@ namespace Grpc.Core.Internal
             }
         }
 
+        protected override void CheckSendingAllowed(bool allowFinished)
+        {
+            base.CheckSendingAllowed(true);
+
+            // throwing RpcException if we already received status on client
+            // side makes the most sense.
+            // Note that this throws even for StatusCode.OK.
+            if (!allowFinished && finishedStatus.HasValue)
+            {
+                throw new RpcException(finishedStatus.Value.Status);
+            }
+        }
+
         /// <summary>
         /// Handles receive status completion for calls with streaming response.
         /// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 42234dcac217d819a68f4f0b57570b76b6e1f660..4de23706b28e5989e6288eb48d3fff7193be363a 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -213,7 +213,7 @@ namespace Grpc.Core.Internal
         {
         }
 
-        protected void CheckSendingAllowed(bool allowFinished)
+        protected virtual void CheckSendingAllowed(bool allowFinished)
         {
             GrpcPreconditions.CheckState(started);
             CheckNotCancelled();
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index b3b1abf1bcaaa71a96d16fdced4b3acb75aaea44..cff85086310711a3b2a88fad410f250552fe0cc5 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -492,6 +492,10 @@ namespace Grpc.IntegrationTesting
                 {
                     // Deadline was reached before write has started. Eat the exception and continue.
                 }
+                catch (RpcException)
+                {
+                    // Deadline was reached before write has started. Eat the exception and continue.
+                }
 
                 var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
                 // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc
index 9af6a88044739d6f17b888276df16689843a1bb1..77278249794751b63a9c54ba0d5112a2da26d272 100644
--- a/test/cpp/interop/client.cc
+++ b/test/cpp/interop/client.cc
@@ -81,6 +81,14 @@ DEFINE_string(default_service_account, "",
 DEFINE_string(service_account_key_file, "",
               "Path to service account json key file.");
 DEFINE_string(oauth_scope, "", "Scope for OAuth tokens.");
+DEFINE_bool(do_not_abort_on_transient_failures, false,
+            "If set to 'true', abort() is not called in case of transient "
+            "failures (i.e failures that are temporary and will likely go away "
+            "on retrying; like a temporary connection failure) and an error "
+            "message is printed instead. Note that this flag just controls "
+            "whether abort() is called or not. It does not control whether the "
+            "test is retried in case of transient failures (and currently the "
+            "interop tests are not retried even if this flag is set to true)");
 
 using grpc::testing::CreateChannelForTestCase;
 using grpc::testing::GetServiceAccountJsonKey;
@@ -89,8 +97,9 @@ int main(int argc, char** argv) {
   grpc::testing::InitTest(&argc, &argv, true);
   gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str());
   int ret = 0;
-  grpc::testing::InteropClient client(
-      CreateChannelForTestCase(FLAGS_test_case));
+  grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case),
+                                      true,
+                                      FLAGS_do_not_abort_on_transient_failures);
   if (FLAGS_test_case == "empty_unary") {
     client.DoEmpty();
   } else if (FLAGS_test_case == "large_unary") {
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc
index 22293d211facaee97b13837207908c4d8d8a9541..e9d95dc8d08bb28571a453b5099fd0397c77c41e 100644
--- a/test/cpp/interop/interop_client.cc
+++ b/test/cpp/interop/interop_client.cc
@@ -134,23 +134,43 @@ void InteropClient::Reset(std::shared_ptr<Channel> channel) {
   serviceStub_.Reset(channel);
 }
 
-InteropClient::InteropClient(std::shared_ptr<Channel> channel)
-    : serviceStub_(channel, true) {}
-
 InteropClient::InteropClient(std::shared_ptr<Channel> channel,
-                             bool new_stub_every_test_case)
-    : serviceStub_(channel, new_stub_every_test_case) {}
+                             bool new_stub_every_test_case,
+                             bool do_not_abort_on_transient_failures)
+    : serviceStub_(channel, new_stub_every_test_case),
+      do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
 
-void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
+bool InteropClient::AssertStatusOk(const Status& s) {
   if (s.ok()) {
-    return;
+    return true;
   }
-  gpr_log(GPR_ERROR, "Error status code: %d, message: %s", s.error_code(),
-          s.error_message().c_str());
-  GPR_ASSERT(0);
+
+  // Note: At this point, s.error_code is definitely not StatusCode::OK (we
+  // already checked for s.ok() above). So, the following will call abort()
+  // (unless s.error_code() corresponds to a transient failure and
+  // 'do_not_abort_on_transient_failures' is true)
+  return AssertStatusCode(s, StatusCode::OK);
 }
 
-void InteropClient::DoEmpty() {
+bool InteropClient::AssertStatusCode(const Status& s,
+                                     StatusCode expected_code) {
+  if (s.error_code() == expected_code) {
+    return true;
+  }
+
+  gpr_log(GPR_ERROR, "Error status code: %d (expected: %d), message: %s",
+          s.error_code(), expected_code, s.error_message().c_str());
+
+  // In case of transient transient/retryable failures (like a broken
+  // connection) we may or may not abort (see TransientFailureOrAbort())
+  if (s.error_code() == grpc::StatusCode::UNAVAILABLE) {
+    return TransientFailureOrAbort();
+  }
+
+  abort();
+}
+
+bool InteropClient::DoEmpty() {
   gpr_log(GPR_DEBUG, "Sending an empty rpc...");
 
   Empty request = Empty::default_instance();
@@ -158,17 +178,21 @@ void InteropClient::DoEmpty() {
   ClientContext context;
 
   Status s = serviceStub_.Get()->EmptyCall(&context, request, &response);
-  AssertOkOrPrintErrorStatus(s);
+
+  if (!AssertStatusOk(s)) {
+    return false;
+  }
 
   gpr_log(GPR_DEBUG, "Empty rpc done.");
+  return true;
 }
 
-void InteropClient::PerformLargeUnary(SimpleRequest* request,
+bool InteropClient::PerformLargeUnary(SimpleRequest* request,
                                       SimpleResponse* response) {
-  PerformLargeUnary(request, response, NoopChecks);
+  return PerformLargeUnary(request, response, NoopChecks);
 }
 
-void InteropClient::PerformLargeUnary(SimpleRequest* request,
+bool InteropClient::PerformLargeUnary(SimpleRequest* request,
                                       SimpleResponse* response,
                                       CheckerFn custom_checks_fn) {
   ClientContext context;
@@ -180,7 +204,9 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request,
   request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
 
   Status s = serviceStub_.Get()->UnaryCall(&context, *request, response);
-  AssertOkOrPrintErrorStatus(s);
+  if (!AssertStatusOk(s)) {
+    return false;
+  }
 
   custom_checks_fn(inspector, request, response);
 
@@ -203,9 +229,11 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request,
     default:
       GPR_ASSERT(false);
   }
+
+  return true;
 }
 
-void InteropClient::DoComputeEngineCreds(
+bool InteropClient::DoComputeEngineCreds(
     const grpc::string& default_service_account,
     const grpc::string& oauth_scope) {
   gpr_log(GPR_DEBUG,
@@ -215,7 +243,11 @@ void InteropClient::DoComputeEngineCreds(
   request.set_fill_username(true);
   request.set_fill_oauth_scope(true);
   request.set_response_type(PayloadType::COMPRESSABLE);
-  PerformLargeUnary(&request, &response);
+
+  if (!PerformLargeUnary(&request, &response)) {
+    return false;
+  }
+
   gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
   gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str());
   GPR_ASSERT(!response.username().empty());
@@ -224,9 +256,10 @@ void InteropClient::DoComputeEngineCreds(
   const char* oauth_scope_str = response.oauth_scope().c_str();
   GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
   gpr_log(GPR_DEBUG, "Large unary with compute engine creds done.");
+  return true;
 }
 
-void InteropClient::DoOauth2AuthToken(const grpc::string& username,
+bool InteropClient::DoOauth2AuthToken(const grpc::string& username,
                                       const grpc::string& oauth_scope) {
   gpr_log(GPR_DEBUG,
           "Sending a unary rpc with raw oauth2 access token credentials ...");
@@ -239,16 +272,20 @@ void InteropClient::DoOauth2AuthToken(const grpc::string& username,
 
   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
 
-  AssertOkOrPrintErrorStatus(s);
+  if (!AssertStatusOk(s)) {
+    return false;
+  }
+
   GPR_ASSERT(!response.username().empty());
   GPR_ASSERT(!response.oauth_scope().empty());
   GPR_ASSERT(username == response.username());
   const char* oauth_scope_str = response.oauth_scope().c_str();
   GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
   gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done.");
+  return true;
 }
 
-void InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
+bool InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
   gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ...");
   SimpleRequest request;
   SimpleResponse response;
@@ -263,35 +300,47 @@ void InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
 
   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
 
-  AssertOkOrPrintErrorStatus(s);
+  if (!AssertStatusOk(s)) {
+    return false;
+  }
+
   GPR_ASSERT(!response.username().empty());
   GPR_ASSERT(json_key.find(response.username()) != grpc::string::npos);
   gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done.");
+  return true;
 }
 
-void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
+bool InteropClient::DoJwtTokenCreds(const grpc::string& username) {
   gpr_log(GPR_DEBUG,
           "Sending a large unary rpc with JWT token credentials ...");
   SimpleRequest request;
   SimpleResponse response;
   request.set_fill_username(true);
   request.set_response_type(PayloadType::COMPRESSABLE);
-  PerformLargeUnary(&request, &response);
+
+  if (!PerformLargeUnary(&request, &response)) {
+    return false;
+  }
+
   GPR_ASSERT(!response.username().empty());
   GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
   gpr_log(GPR_DEBUG, "Large unary with JWT token creds done.");
+  return true;
 }
 
-void InteropClient::DoLargeUnary() {
+bool InteropClient::DoLargeUnary() {
   gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
   SimpleRequest request;
   SimpleResponse response;
   request.set_response_type(PayloadType::COMPRESSABLE);
-  PerformLargeUnary(&request, &response);
+  if (!PerformLargeUnary(&request, &response)) {
+    return false;
+  }
   gpr_log(GPR_DEBUG, "Large unary done.");
+  return true;
 }
 
-void InteropClient::DoLargeCompressedUnary() {
+bool InteropClient::DoLargeCompressedUnary() {
   const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
   const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
   for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
@@ -307,14 +356,32 @@ void InteropClient::DoLargeCompressedUnary() {
       SimpleResponse response;
       request.set_response_type(payload_types[i]);
       request.set_response_compression(compression_types[j]);
-      PerformLargeUnary(&request, &response, CompressionChecks);
+
+      if (!PerformLargeUnary(&request, &response, CompressionChecks)) {
+        gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix);
+        gpr_free(log_suffix);
+        return false;
+      }
+
       gpr_log(GPR_DEBUG, "Large compressed unary done %s.", log_suffix);
       gpr_free(log_suffix);
     }
   }
+
+  return true;
+}
+
+// Either abort() (unless do_not_abort_on_transient_failures_ is true) or return
+// false
+bool InteropClient::TransientFailureOrAbort() {
+  if (do_not_abort_on_transient_failures_) {
+    return false;
+  }
+
+  abort();
 }
 
-void InteropClient::DoRequestStreaming() {
+bool InteropClient::DoRequestStreaming() {
   gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
 
   ClientContext context;
@@ -328,18 +395,24 @@ void InteropClient::DoRequestStreaming() {
   for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
     Payload* payload = request.mutable_payload();
     payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
-    GPR_ASSERT(stream->Write(request));
+    if (!stream->Write(request)) {
+      gpr_log(GPR_ERROR, "DoRequestStreaming(): stream->Write() failed");
+      return TransientFailureOrAbort();
+    }
     aggregated_payload_size += request_stream_sizes[i];
   }
   stream->WritesDone();
+
   Status s = stream->Finish();
+  if (!AssertStatusOk(s)) {
+    return false;
+  }
 
   GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
-  AssertOkOrPrintErrorStatus(s);
-  gpr_log(GPR_DEBUG, "Request streaming done.");
+  return true;
 }
 
-void InteropClient::DoResponseStreaming() {
+bool InteropClient::DoResponseStreaming() {
   gpr_log(GPR_DEBUG, "Receiving response steaming rpc ...");
 
   ClientContext context;
@@ -358,13 +431,27 @@ void InteropClient::DoResponseStreaming() {
                grpc::string(response_stream_sizes[i], '\0'));
     ++i;
   }
-  GPR_ASSERT(response_stream_sizes.size() == i);
+
+  if (i < response_stream_sizes.size()) {
+    // stream->Read() failed before reading all the expected messages. This is
+    // most likely due to connection failure.
+    gpr_log(GPR_ERROR,
+            "DoResponseStreaming(): Read fewer streams (%d) than "
+            "response_stream_sizes.size() (%d)",
+            i, response_stream_sizes.size());
+    return TransientFailureOrAbort();
+  }
+
   Status s = stream->Finish();
-  AssertOkOrPrintErrorStatus(s);
+  if (!AssertStatusOk(s)) {
+    return false;
+  }
+
   gpr_log(GPR_DEBUG, "Response streaming done.");
+  return true;
 }
 
-void InteropClient::DoResponseCompressedStreaming() {
+bool InteropClient::DoResponseCompressedStreaming() {
   const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
   const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
   for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
@@ -432,17 +519,31 @@ void InteropClient::DoResponseCompressedStreaming() {
         ++k;
       }
 
-      GPR_ASSERT(response_stream_sizes.size() == k);
-      Status s = stream->Finish();
-
-      AssertOkOrPrintErrorStatus(s);
       gpr_log(GPR_DEBUG, "Response streaming done %s.", log_suffix);
       gpr_free(log_suffix);
+
+      if (k < response_stream_sizes.size()) {
+        // stream->Read() failed before reading all the expected messages. This
+        // is most likely due to a connection failure.
+        gpr_log(GPR_ERROR,
+                "DoResponseCompressedStreaming(): Responses read (k=%d) is "
+                "less than the expected messages (i.e "
+                "response_stream_sizes.size() (%d)). (i=%d, j=%d)",
+                k, response_stream_sizes.size(), i, j);
+        return TransientFailureOrAbort();
+      }
+
+      Status s = stream->Finish();
+      if (!AssertStatusOk(s)) {
+        return false;
+      }
     }
   }
+
+  return true;
 }
 
-void InteropClient::DoResponseStreamingWithSlowConsumer() {
+bool InteropClient::DoResponseStreamingWithSlowConsumer() {
   gpr_log(GPR_DEBUG, "Receiving response steaming rpc with slow consumer ...");
 
   ClientContext context;
@@ -464,14 +565,26 @@ void InteropClient::DoResponseStreamingWithSlowConsumer() {
     usleep(kReceiveDelayMilliSeconds * 1000);
     ++i;
   }
-  GPR_ASSERT(kNumResponseMessages == i);
+
+  if (i < kNumResponseMessages) {
+    gpr_log(GPR_ERROR,
+            "DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is "
+            "less than the expected messages (i.e kNumResponseMessages = %d)",
+            i, kNumResponseMessages);
+
+    return TransientFailureOrAbort();
+  }
+
   Status s = stream->Finish();
+  if (!AssertStatusOk(s)) {
+    return false;
+  }
 
-  AssertOkOrPrintErrorStatus(s);
   gpr_log(GPR_DEBUG, "Response streaming done.");
+  return true;
 }
 
-void InteropClient::DoHalfDuplex() {
+bool InteropClient::DoHalfDuplex() {
   gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ...");
 
   ClientContext context;
@@ -483,7 +596,11 @@ void InteropClient::DoHalfDuplex() {
   ResponseParameters* response_parameter = request.add_response_parameters();
   for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
     response_parameter->set_size(response_stream_sizes[i]);
-    GPR_ASSERT(stream->Write(request));
+
+    if (!stream->Write(request)) {
+      gpr_log(GPR_ERROR, "DoHalfDuplex(): stream->Write() failed. i=%d", i);
+      return TransientFailureOrAbort();
+    }
   }
   stream->WritesDone();
 
@@ -494,13 +611,27 @@ void InteropClient::DoHalfDuplex() {
                grpc::string(response_stream_sizes[i], '\0'));
     ++i;
   }
-  GPR_ASSERT(response_stream_sizes.size() == i);
+
+  if (i < response_stream_sizes.size()) {
+    // stream->Read() failed before reading all the expected messages. This is
+    // most likely due to a connection failure
+    gpr_log(GPR_ERROR,
+            "DoHalfDuplex(): Responses read (i=%d) are less than the expected "
+            "number of messages response_stream_sizes.size() (%d)",
+            i, response_stream_sizes.size());
+    return TransientFailureOrAbort();
+  }
+
   Status s = stream->Finish();
-  AssertOkOrPrintErrorStatus(s);
+  if (!AssertStatusOk(s)) {
+    return false;
+  }
+
   gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done.");
+  return true;
 }
 
-void InteropClient::DoPingPong() {
+bool InteropClient::DoPingPong() {
   gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
 
   ClientContext context;
@@ -513,23 +644,39 @@ void InteropClient::DoPingPong() {
   ResponseParameters* response_parameter = request.add_response_parameters();
   Payload* payload = request.mutable_payload();
   StreamingOutputCallResponse response;
+
   for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
     response_parameter->set_size(response_stream_sizes[i]);
     payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
-    GPR_ASSERT(stream->Write(request));
-    GPR_ASSERT(stream->Read(&response));
+
+    if (!stream->Write(request)) {
+      gpr_log(GPR_ERROR, "DoPingPong(): stream->Write() failed. i: %d", i);
+      return TransientFailureOrAbort();
+    }
+
+    if (!stream->Read(&response)) {
+      gpr_log(GPR_ERROR, "DoPingPong(): stream->Read() failed. i:%d", i);
+      return TransientFailureOrAbort();
+    }
+
     GPR_ASSERT(response.payload().body() ==
                grpc::string(response_stream_sizes[i], '\0'));
   }
 
   stream->WritesDone();
+
   GPR_ASSERT(!stream->Read(&response));
+
   Status s = stream->Finish();
-  AssertOkOrPrintErrorStatus(s);
+  if (!AssertStatusOk(s)) {
+    return false;
+  }
+
   gpr_log(GPR_DEBUG, "Ping pong streaming done.");
+  return true;
 }
 
-void InteropClient::DoCancelAfterBegin() {
+bool InteropClient::DoCancelAfterBegin() {
   gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
 
   ClientContext context;
@@ -542,11 +689,16 @@ void InteropClient::DoCancelAfterBegin() {
   gpr_log(GPR_DEBUG, "Trying to cancel...");
   context.TryCancel();
   Status s = stream->Finish();
-  GPR_ASSERT(s.error_code() == StatusCode::CANCELLED);
+
+  if (!AssertStatusCode(s, StatusCode::CANCELLED)) {
+    return false;
+  }
+
   gpr_log(GPR_DEBUG, "Canceling streaming done.");
+  return true;
 }
 
-void InteropClient::DoCancelAfterFirstResponse() {
+bool InteropClient::DoCancelAfterFirstResponse() {
   gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
 
   ClientContext context;
@@ -560,17 +712,27 @@ void InteropClient::DoCancelAfterFirstResponse() {
   response_parameter->set_size(31415);
   request.mutable_payload()->set_body(grpc::string(27182, '\0'));
   StreamingOutputCallResponse response;
-  GPR_ASSERT(stream->Write(request));
-  GPR_ASSERT(stream->Read(&response));
+
+  if (!stream->Write(request)) {
+    gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Write() failed");
+    return TransientFailureOrAbort();
+  }
+
+  if (!stream->Read(&response)) {
+    gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Read failed");
+    return TransientFailureOrAbort();
+  }
   GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
+
   gpr_log(GPR_DEBUG, "Trying to cancel...");
   context.TryCancel();
 
   Status s = stream->Finish();
   gpr_log(GPR_DEBUG, "Canceling pingpong streaming done.");
+  return true;
 }
 
-void InteropClient::DoTimeoutOnSleepingServer() {
+bool InteropClient::DoTimeoutOnSleepingServer() {
   gpr_log(GPR_DEBUG,
           "Sending Ping Pong streaming rpc with a short deadline...");
 
@@ -587,11 +749,15 @@ void InteropClient::DoTimeoutOnSleepingServer() {
   stream->Write(request);
 
   Status s = stream->Finish();
-  GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED);
+  if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED)) {
+    return false;
+  }
+
   gpr_log(GPR_DEBUG, "Pingpong streaming timeout done.");
+  return true;
 }
 
-void InteropClient::DoEmptyStream() {
+bool InteropClient::DoEmptyStream() {
   gpr_log(GPR_DEBUG, "Starting empty_stream.");
 
   ClientContext context;
@@ -601,12 +767,17 @@ void InteropClient::DoEmptyStream() {
   stream->WritesDone();
   StreamingOutputCallResponse response;
   GPR_ASSERT(stream->Read(&response) == false);
+
   Status s = stream->Finish();
-  AssertOkOrPrintErrorStatus(s);
+  if (!AssertStatusOk(s)) {
+    return false;
+  }
+
   gpr_log(GPR_DEBUG, "empty_stream done.");
+  return true;
 }
 
-void InteropClient::DoStatusWithMessage() {
+bool InteropClient::DoStatusWithMessage() {
   gpr_log(GPR_DEBUG,
           "Sending RPC with a request for status code 2 and message");
 
@@ -620,12 +791,16 @@ void InteropClient::DoStatusWithMessage() {
 
   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
 
-  GPR_ASSERT(s.error_code() == grpc::StatusCode::UNKNOWN);
+  if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN)) {
+    return false;
+  }
+
   GPR_ASSERT(s.error_message() == test_msg);
   gpr_log(GPR_DEBUG, "Done testing Status and Message");
+  return true;
 }
 
-void InteropClient::DoCustomMetadata() {
+bool InteropClient::DoCustomMetadata() {
   const grpc::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
   const grpc::string kInitialMetadataValue("test_initial_metadata_value");
   const grpc::string kEchoTrailingBinMetadataKey(
@@ -645,7 +820,10 @@ void InteropClient::DoCustomMetadata() {
     request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
 
     Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
-    AssertOkOrPrintErrorStatus(s);
+    if (!AssertStatusOk(s)) {
+      return false;
+    }
+
     const auto& server_initial_metadata = context.GetServerInitialMetadata();
     auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
     GPR_ASSERT(iter != server_initial_metadata.end());
@@ -675,14 +853,29 @@ void InteropClient::DoCustomMetadata() {
     grpc::string payload(kLargeRequestSize, '\0');
     request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
     StreamingOutputCallResponse response;
-    GPR_ASSERT(stream->Write(request));
+
+    if (!stream->Write(request)) {
+      gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Write() failed");
+      return TransientFailureOrAbort();
+    }
+
     stream->WritesDone();
-    GPR_ASSERT(stream->Read(&response));
+
+    if (!stream->Read(&response)) {
+      gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Read() failed");
+      return TransientFailureOrAbort();
+    }
+
     GPR_ASSERT(response.payload().body() ==
                grpc::string(kLargeResponseSize, '\0'));
+
     GPR_ASSERT(!stream->Read(&response));
+
     Status s = stream->Finish();
-    AssertOkOrPrintErrorStatus(s);
+    if (!AssertStatusOk(s)) {
+      return false;
+    }
+
     const auto& server_initial_metadata = context.GetServerInitialMetadata();
     auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
     GPR_ASSERT(iter != server_initial_metadata.end());
@@ -695,6 +888,8 @@ void InteropClient::DoCustomMetadata() {
 
     gpr_log(GPR_DEBUG, "Done testing stream with custom metadata");
   }
+
+  return true;
 }
 
 }  // namespace testing
diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h
index a3794fd93feb785dbada780f65b755f25d417fe3..ae75762bb8f634b3598aba4c3195270f5e7eb76c 100644
--- a/test/cpp/interop/interop_client.h
+++ b/test/cpp/interop/interop_client.h
@@ -51,41 +51,42 @@ using CheckerFn =
 
 class InteropClient {
  public:
-  explicit InteropClient(std::shared_ptr<Channel> channel);
-  explicit InteropClient(
-      std::shared_ptr<Channel> channel,
-      bool new_stub_every_test_case);  // If new_stub_every_test_case is true,
-                                       // a new TestService::Stub object is
-                                       // created for every test case below
+  /// If new_stub_every_test_case is true, a new TestService::Stub object is
+  /// created for every test case
+  /// If do_not_abort_on_transient_failures is true, abort() is not called in
+  /// case of transient failures (like connection failures)
+  explicit InteropClient(std::shared_ptr<Channel> channel,
+                         bool new_stub_every_test_case,
+                         bool do_not_abort_on_transient_failures);
   ~InteropClient() {}
 
   void Reset(std::shared_ptr<Channel> channel);
 
-  void DoEmpty();
-  void DoLargeUnary();
-  void DoLargeCompressedUnary();
-  void DoPingPong();
-  void DoHalfDuplex();
-  void DoRequestStreaming();
-  void DoResponseStreaming();
-  void DoResponseCompressedStreaming();
-  void DoResponseStreamingWithSlowConsumer();
-  void DoCancelAfterBegin();
-  void DoCancelAfterFirstResponse();
-  void DoTimeoutOnSleepingServer();
-  void DoEmptyStream();
-  void DoStatusWithMessage();
-  void DoCustomMetadata();
+  bool DoEmpty();
+  bool DoLargeUnary();
+  bool DoLargeCompressedUnary();
+  bool DoPingPong();
+  bool DoHalfDuplex();
+  bool DoRequestStreaming();
+  bool DoResponseStreaming();
+  bool DoResponseCompressedStreaming();
+  bool DoResponseStreamingWithSlowConsumer();
+  bool DoCancelAfterBegin();
+  bool DoCancelAfterFirstResponse();
+  bool DoTimeoutOnSleepingServer();
+  bool DoEmptyStream();
+  bool DoStatusWithMessage();
+  bool DoCustomMetadata();
   // Auth tests.
   // username is a string containing the user email
-  void DoJwtTokenCreds(const grpc::string& username);
-  void DoComputeEngineCreds(const grpc::string& default_service_account,
+  bool DoJwtTokenCreds(const grpc::string& username);
+  bool DoComputeEngineCreds(const grpc::string& default_service_account,
                             const grpc::string& oauth_scope);
   // username the GCE default service account email
-  void DoOauth2AuthToken(const grpc::string& username,
+  bool DoOauth2AuthToken(const grpc::string& username,
                          const grpc::string& oauth_scope);
   // username is a string containing the user email
-  void DoPerRpcCreds(const grpc::string& json_key);
+  bool DoPerRpcCreds(const grpc::string& json_key);
 
  private:
   class ServiceStub {
@@ -105,13 +106,18 @@ class InteropClient {
                                 // Get() call
   };
 
-  void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response);
+  bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response);
 
   /// Run \a custom_check_fn as an additional check.
-  void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response,
+  bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response,
                          CheckerFn custom_checks_fn);
-  void AssertOkOrPrintErrorStatus(const Status& s);
+  bool AssertStatusOk(const Status& s);
+  bool AssertStatusCode(const Status& s, StatusCode expected_code);
+  bool TransientFailureOrAbort();
   ServiceStub serviceStub_;
+
+  /// If true, abort() is not called for transient failures
+  bool do_not_abort_on_transient_failures_;
 };
 
 }  // namespace testing
diff --git a/test/cpp/interop/stress_interop_client.cc b/test/cpp/interop/stress_interop_client.cc
index f287a5aa3b47386e2ecd6992013bc17725a5c2ca..aa95682e7413598b360b06312fbc99175c0eabf0 100644
--- a/test/cpp/interop/stress_interop_client.cc
+++ b/test/cpp/interop/stress_interop_client.cc
@@ -84,11 +84,12 @@ StressTestInteropClient::StressTestInteropClient(
     int test_id, const grpc::string& server_address,
     std::shared_ptr<Channel> channel,
     const WeightedRandomTestSelector& test_selector, long test_duration_secs,
-    long sleep_duration_ms)
+    long sleep_duration_ms, bool do_not_abort_on_transient_failures)
     : test_id_(test_id),
       server_address_(server_address),
       channel_(channel),
-      interop_client_(new InteropClient(channel, false)),
+      interop_client_(new InteropClient(channel, false,
+                                        do_not_abort_on_transient_failures)),
       test_selector_(test_selector),
       test_duration_secs_(test_duration_secs),
       sleep_duration_ms_(sleep_duration_ms) {}
@@ -126,31 +127,67 @@ void StressTestInteropClient::MainLoop(std::shared_ptr<QpsGauge> qps_gauge) {
   }
 }
 
-// TODO(sree): Add all interop tests
-void StressTestInteropClient::RunTest(TestCaseType test_case) {
+bool StressTestInteropClient::RunTest(TestCaseType test_case) {
+  bool is_success = false;
   switch (test_case) {
     case EMPTY_UNARY: {
-      interop_client_->DoEmpty();
+      is_success = interop_client_->DoEmpty();
       break;
     }
     case LARGE_UNARY: {
-      interop_client_->DoLargeUnary();
+      is_success = interop_client_->DoLargeUnary();
       break;
     }
     case LARGE_COMPRESSED_UNARY: {
-      interop_client_->DoLargeCompressedUnary();
+      is_success = interop_client_->DoLargeCompressedUnary();
       break;
     }
     case CLIENT_STREAMING: {
-      interop_client_->DoRequestStreaming();
+      is_success = interop_client_->DoRequestStreaming();
       break;
     }
     case SERVER_STREAMING: {
-      interop_client_->DoResponseStreaming();
+      is_success = interop_client_->DoResponseStreaming();
+      break;
+    }
+    case SERVER_COMPRESSED_STREAMING: {
+      is_success = interop_client_->DoResponseCompressedStreaming();
+      break;
+    }
+    case SLOW_CONSUMER: {
+      is_success = interop_client_->DoResponseStreamingWithSlowConsumer();
+      break;
+    }
+    case HALF_DUPLEX: {
+      is_success = interop_client_->DoHalfDuplex();
+      break;
+    }
+    case PING_PONG: {
+      is_success = interop_client_->DoPingPong();
+      break;
+    }
+    case CANCEL_AFTER_BEGIN: {
+      is_success = interop_client_->DoCancelAfterBegin();
+      break;
+    }
+    case CANCEL_AFTER_FIRST_RESPONSE: {
+      is_success = interop_client_->DoCancelAfterFirstResponse();
+      break;
+    }
+    case TIMEOUT_ON_SLEEPING_SERVER: {
+      is_success = interop_client_->DoTimeoutOnSleepingServer();
       break;
     }
     case EMPTY_STREAM: {
-      interop_client_->DoEmptyStream();
+      is_success = interop_client_->DoEmptyStream();
+      break;
+    }
+    case STATUS_CODE_AND_MESSAGE: {
+      is_success = interop_client_->DoStatusWithMessage();
+      break;
+    }
+    case CUSTOM_METADATA: {
+      is_success = interop_client_->DoCustomMetadata();
       break;
     }
     default: {
@@ -159,6 +196,8 @@ void StressTestInteropClient::RunTest(TestCaseType test_case) {
       break;
     }
   }
+
+  return is_success;
 }
 
 }  // namespace testing
diff --git a/test/cpp/interop/stress_interop_client.h b/test/cpp/interop/stress_interop_client.h
index cb0cd988214e5298e39955a8087169a279207263..aa93b58b4a75b233d0c10ce55df4c2c3c9dbdbd1 100644
--- a/test/cpp/interop/stress_interop_client.h
+++ b/test/cpp/interop/stress_interop_client.h
@@ -49,7 +49,6 @@ namespace testing {
 using std::pair;
 using std::vector;
 
-// TODO(sreek): Add more test cases here in future
 enum TestCaseType {
   UNKNOWN_TEST = -1,
   EMPTY_UNARY = 0,
@@ -57,7 +56,16 @@ enum TestCaseType {
   LARGE_COMPRESSED_UNARY = 2,
   CLIENT_STREAMING = 3,
   SERVER_STREAMING = 4,
-  EMPTY_STREAM = 5
+  SERVER_COMPRESSED_STREAMING = 5,
+  SLOW_CONSUMER = 6,
+  HALF_DUPLEX = 7,
+  PING_PONG = 8,
+  CANCEL_AFTER_BEGIN = 9,
+  CANCEL_AFTER_FIRST_RESPONSE = 10,
+  TIMEOUT_ON_SLEEPING_SERVER = 11,
+  EMPTY_STREAM = 12,
+  STATUS_CODE_AND_MESSAGE = 13,
+  CUSTOM_METADATA = 14
 };
 
 const vector<pair<TestCaseType, grpc::string>> kTestCaseList = {
@@ -66,7 +74,16 @@ const vector<pair<TestCaseType, grpc::string>> kTestCaseList = {
     {LARGE_COMPRESSED_UNARY, "large_compressed_unary"},
     {CLIENT_STREAMING, "client_streaming"},
     {SERVER_STREAMING, "server_streaming"},
-    {EMPTY_STREAM, "empty_stream"}};
+    {SERVER_COMPRESSED_STREAMING, "server_compressed_streaming"},
+    {SLOW_CONSUMER, "slow_consumer"},
+    {HALF_DUPLEX, "half_duplex"},
+    {PING_PONG, "ping_pong"},
+    {CANCEL_AFTER_BEGIN, "cancel_after_begin"},
+    {CANCEL_AFTER_FIRST_RESPONSE, "cancel_after_first_response"},
+    {TIMEOUT_ON_SLEEPING_SERVER, "timeout_on_sleeping_server"},
+    {EMPTY_STREAM, "empty_stream"},
+    {STATUS_CODE_AND_MESSAGE, "status_code_and_message"},
+    {CUSTOM_METADATA, "custom_metadata"}};
 
 class WeightedRandomTestSelector {
  public:
@@ -87,14 +104,15 @@ class StressTestInteropClient {
   StressTestInteropClient(int test_id, const grpc::string& server_address,
                           std::shared_ptr<Channel> channel,
                           const WeightedRandomTestSelector& test_selector,
-                          long test_duration_secs, long sleep_duration_ms);
+                          long test_duration_secs, long sleep_duration_ms,
+                          bool do_not_abort_on_transient_failures);
 
   // The main function. Use this as the thread entry point.
   // qps_gauge is the QpsGauge to record the requests per second metric
   void MainLoop(std::shared_ptr<QpsGauge> qps_gauge);
 
  private:
-  void RunTest(TestCaseType test_case);
+  bool RunTest(TestCaseType test_case);
 
   int test_id_;
   const grpc::string& server_address_;
diff --git a/test/cpp/interop/stress_test.cc b/test/cpp/interop/stress_test.cc
index d9e3fd25c512960837171939f8b00c6a00733575..77879319003f281692ef05a56fb9efbcc174e820 100644
--- a/test/cpp/interop/stress_test.cc
+++ b/test/cpp/interop/stress_test.cc
@@ -89,7 +89,16 @@ DEFINE_string(test_cases, "",
               "   large_compressed_unary\n"
               "   client_streaming\n"
               "   server_streaming\n"
+              "   server_compressed_streaming\n"
+              "   slow_consumer\n"
+              "   half_duplex\n"
+              "   ping_pong\n"
+              "   cancel_after_begin\n"
+              "   cancel_after_first_response\n"
+              "   timeout_on_sleeping_server\n"
               "   empty_stream\n"
+              "   status_code_and_message\n"
+              "   custom_metadata\n"
               " Example: \"empty_unary:20,large_unary:10,empty_stream:70\"\n"
               " The above will execute 'empty_unary', 20% of the time,"
               " 'large_unary', 10% of the time and 'empty_stream' the remaining"
@@ -101,6 +110,10 @@ DEFINE_int32(log_level, GPR_LOG_SEVERITY_INFO,
              "The choices are: 0 (GPR_LOG_SEVERITY_DEBUG), 1 "
              "(GPR_LOG_SEVERITY_INFO) and 2 (GPR_LOG_SEVERITY_ERROR)");
 
+DEFINE_bool(do_not_abort_on_transient_failures, true,
+            "If set to 'true', abort() is not called in case of transient "
+            "failures like temporary connection failures.");
+
 using grpc::testing::kTestCaseList;
 using grpc::testing::MetricsService;
 using grpc::testing::MetricsServiceImpl;
@@ -189,6 +202,12 @@ void LogParameterInfo(const std::vector<grpc::string>& addresses,
   gpr_log(GPR_INFO, "test_cases : %s", FLAGS_test_cases.c_str());
   gpr_log(GPR_INFO, "sleep_duration_ms: %d", FLAGS_sleep_duration_ms);
   gpr_log(GPR_INFO, "test_duration_secs: %d", FLAGS_test_duration_secs);
+  gpr_log(GPR_INFO, "num_channels_per_server: %d",
+          FLAGS_num_channels_per_server);
+  gpr_log(GPR_INFO, "num_stubs_per_channel: %d", FLAGS_num_stubs_per_channel);
+  gpr_log(GPR_INFO, "log_level: %d", FLAGS_log_level);
+  gpr_log(GPR_INFO, "do_not_abort_on_transient_failures: %s",
+          FLAGS_do_not_abort_on_transient_failures ? "true" : "false");
 
   int num = 0;
   for (auto it = addresses.begin(); it != addresses.end(); it++) {
@@ -272,7 +291,7 @@ int main(int argc, char** argv) {
            stub_idx++) {
         StressTestInteropClient* client = new StressTestInteropClient(
             ++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs,
-            FLAGS_sleep_duration_ms);
+            FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures);
 
         bool is_already_created = false;
         // QpsGauge name
diff --git a/tools/gce/create_linux_performance_worker.sh b/tools/gce/create_linux_performance_worker.sh
index 96d5558d9a1642d27aca6a4ce03ec460ec2130c7..c9a0ffa4e100f8b44fdfa931c92c956e4ea2ca27 100755
--- a/tools/gce/create_linux_performance_worker.sh
+++ b/tools/gce/create_linux_performance_worker.sh
@@ -50,7 +50,7 @@ gcloud compute instances create $INSTANCE_NAME \
     --machine-type $MACHINE_TYPE \
     --image ubuntu-15-10 \
     --boot-disk-size 300 \
-    --scope https://www.googleapis.com/auth/bigquery
+    --scopes https://www.googleapis.com/auth/bigquery
 
 echo 'Created GCE instance, waiting 60 seconds for it to come online.'
 sleep 60
diff --git a/tools/gce/linux_performance_worker_init.sh b/tools/gce/linux_performance_worker_init.sh
index 96e8a1353cbe3db13b6cb1e7e37fde7eb4675747..dc4784262e7075306b80c4e23220c61ef9a56575 100755
--- a/tools/gce/linux_performance_worker_init.sh
+++ b/tools/gce/linux_performance_worker_init.sh
@@ -77,6 +77,9 @@ sudo apt-get install -y \
 # perftools
 sudo apt-get install -y google-perftools libgoogle-perftools-dev
 
+# netperf
+sudo apt-get install -y netperf
+
 # C++ dependencies
 sudo apt-get install -y libgflags-dev libgtest-dev libc++-dev clang
 
diff --git a/tools/run_tests/performance/bq_upload_result.py b/tools/run_tests/performance/bq_upload_result.py
index ebd28f75918c480816a66f9e938aa3fb072b41f7..fbccf3bdcabd6410d07dc5455812d7c265a222dc 100755
--- a/tools/run_tests/performance/bq_upload_result.py
+++ b/tools/run_tests/performance/bq_upload_result.py
@@ -48,20 +48,47 @@ import big_query_utils
 _PROJECT_ID='grpc-testing'
 
 
-def _upload_scenario_result_to_bigquery(dataset_id, table_id, result_file):
+def _upload_netperf_latency_csv_to_bigquery(dataset_id, table_id, result_file):
+  with open(result_file, 'r') as f:
+    (col1, col2, col3) = f.read().split(',')
+    latency50 = float(col1.strip()) * 1000
+    latency90 = float(col2.strip()) * 1000
+    latency99 = float(col3.strip()) * 1000
+
+    scenario_result = {
+        'scenario': {
+          'name': 'netperf_tcp_rr'
+        },
+        'summary': {
+          'latency50': latency50,
+          'latency90': latency90,
+          'latency99': latency99
+        }
+    }
+
   bq = big_query_utils.create_big_query()
   _create_results_table(bq, dataset_id, table_id)
 
+  if not _insert_result(bq, dataset_id, table_id, scenario_result, flatten=False):
+    print 'Error uploading result to bigquery.'
+    sys.exit(1)
+
+
+def _upload_scenario_result_to_bigquery(dataset_id, table_id, result_file):
   with open(result_file, 'r') as f:
     scenario_result = json.loads(f.read())
 
+  bq = big_query_utils.create_big_query()
+  _create_results_table(bq, dataset_id, table_id)
+
   if not _insert_result(bq, dataset_id, table_id, scenario_result):
     print 'Error uploading result to bigquery.'
     sys.exit(1)
 
 
-def _insert_result(bq, dataset_id, table_id, scenario_result):
-  _flatten_result_inplace(scenario_result)
+def _insert_result(bq, dataset_id, table_id, scenario_result, flatten=True):
+  if flatten:
+    _flatten_result_inplace(scenario_result)
   _populate_metadata_inplace(scenario_result)
   row = big_query_utils.make_row(str(uuid.uuid4()), scenario_result)
   return big_query_utils.insert_rows(bq,
@@ -127,9 +154,17 @@ argp.add_argument('--bq_result_table', required=True, default=None, type=str,
                   help='Bigquery "dataset.table" to upload results to.')
 argp.add_argument('--file_to_upload', default='scenario_result.json', type=str,
                   help='Report file to upload.')
+argp.add_argument('--file_format',
+                  choices=['scenario_result','netperf_latency_csv'],
+                  default='scenario_result',
+                  help='Format of the file to upload.')
 
 args = argp.parse_args()
 
 dataset_id, table_id = args.bq_result_table.split('.', 2)
-_upload_scenario_result_to_bigquery(dataset_id, table_id, args.file_to_upload)
+
+if args.file_format == 'netperf_latency_csv':
+  _upload_netperf_latency_csv_to_bigquery(dataset_id, table_id, args.file_to_upload)
+else:
+  _upload_scenario_result_to_bigquery(dataset_id, table_id, args.file_to_upload)
 print 'Successfully uploaded %s to BigQuery.\n' % args.file_to_upload
diff --git a/tools/run_tests/performance/run_netperf.sh b/tools/run_tests/performance/run_netperf.sh
new file mode 100755
index 0000000000000000000000000000000000000000..298edbe0c3a33b04ace7e48af564b8ef12394fc2
--- /dev/null
+++ b/tools/run_tests/performance/run_netperf.sh
@@ -0,0 +1,45 @@
+#!/bin/bash
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+set -ex
+
+cd $(dirname $0)/../../..
+
+netperf >netperf_latency.txt -P 0 -t TCP_RR -H "$NETPERF_SERVER_HOST" -- -r 1,1 -o P50_LATENCY,P90_LATENCY,P99_LATENCY
+
+cat netperf_latency.txt
+
+if [ "$BQ_RESULT_TABLE" != "" ]
+then
+  tools/run_tests/performance/bq_upload_result.py \
+      --file_to_upload=netperf_latency.txt \
+      --file_format=netperf_latency_csv \
+      --bq_result_table="$BQ_RESULT_TABLE"
+fi
diff --git a/tools/run_tests/run_performance_tests.py b/tools/run_tests/run_performance_tests.py
index b1f5889e5415cb797921c230303cb48ff3d3ffee..674d8645396b0b76adcca5b0f6611b2d979847f6 100755
--- a/tools/run_tests/run_performance_tests.py
+++ b/tools/run_tests/run_performance_tests.py
@@ -131,6 +131,25 @@ def create_quit_jobspec(workers, remote_host=None):
       verbose_success=True)
 
 
+def create_netperf_jobspec(server_host='localhost', client_host=None,
+                           bq_result_table=None):
+  """Runs netperf benchmark."""
+  cmd = 'NETPERF_SERVER_HOST="%s" ' % server_host
+  if bq_result_table:
+    cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table
+  cmd += 'tools/run_tests/performance/run_netperf.sh'
+  if client_host:
+    user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, client_host)
+    cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (user_at_host, pipes.quote(cmd))
+
+  return jobset.JobSpec(
+      cmdline=[cmd],
+      shortname='netperf',
+      timeout_seconds=60,
+      shell=True,
+      verbose_success=True)
+
+
 def archive_repo(languages):
   """Archives local version of repo including submodules."""
   cmdline=['tar', '-cf', '../grpc.tar', '../grpc/']
@@ -244,12 +263,28 @@ def start_qpsworkers(languages, worker_hosts):
 
 
 def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
-                     category='all', bq_result_table=None):
+                     category='all', bq_result_table=None,
+                     netperf=False, netperf_hosts=[]):
   """Create jobspecs for scenarios to run."""
   all_workers = [worker
                  for workers in workers_by_lang.values()
                  for worker in workers]
   scenarios = []
+
+  if netperf:
+    if not netperf_hosts:
+      netperf_server='localhost'
+      netperf_client=None
+    elif len(netperf_hosts) == 1:
+      netperf_server=netperf_hosts[0]
+      netperf_client=netperf_hosts[0]
+    else:
+      netperf_server=netperf_hosts[0]
+      netperf_client=netperf_hosts[1]
+    scenarios.append(create_netperf_jobspec(server_host=netperf_server,
+                                            client_host=netperf_client,
+                                            bq_result_table=bq_result_table))
+
   for language in languages:
     for scenario_json in language.scenarios():
       if re.search(args.regex, scenario_json['name']):
@@ -316,6 +351,11 @@ argp.add_argument('--category',
                   choices=['smoketest','all'],
                   default='smoketest',
                   help='Select a category of tests to run. Smoketest runs by default.')
+argp.add_argument('--netperf',
+                  default=False,
+                  action='store_const',
+                  const=True,
+                  help='Run netperf benchmark as one of the scenarios.')
 
 args = argp.parse_args()
 
@@ -360,7 +400,10 @@ try:
                                remote_host=args.remote_driver_host,
                                regex=args.regex,
                                category=args.category,
-                               bq_result_table=args.bq_result_table)
+                               bq_result_table=args.bq_result_table,
+                               netperf=args.netperf,
+                               netperf_hosts=args.remote_worker_host)
+
   if not scenarios:
     raise Exception('No scenarios to run')