diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 2659b0e2132f3435d8372021f6eec00df2a72eb8..448ea253c2730d1dd73fda7120bd5aede1f20c3e 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -195,8 +195,6 @@ const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll";
 const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity";
 
 
-}  // namespace
-
 class Proxy : public ::grpc::cpp::test::util::TestService::Service {
  public:
   Proxy(std::shared_ptr<Channel> channel)
@@ -353,14 +351,24 @@ class TestServiceImplDupPkg
   }
 };
 
-/* Param is whether or not to use a proxy -- some tests use TEST_F as they don't
-   need this functionality */
-class End2endTest : public ::testing::TestWithParam<bool> {
+class TestScenario {
+ public:
+  TestScenario(bool proxy, bool tls) : use_proxy(proxy), use_tls(tls) {}
+  void Log() const {
+    gpr_log(GPR_INFO, "Scenario: proxy %d, tls %d", use_proxy, use_tls);
+  }
+  bool use_proxy;
+  bool use_tls;
+};
+
+class End2endTest : public ::testing::TestWithParam<TestScenario> {
  protected:
   End2endTest()
       : is_server_started_(false),
         kMaxMessageSize_(8192),
-        special_service_("special") {}
+        special_service_("special") {
+    GetParam().Log();
+  }
 
   void TearDown() GRPC_OVERRIDE {
     if (is_server_started_) {
@@ -374,13 +382,16 @@ class End2endTest : public ::testing::TestWithParam<bool> {
     server_address_ << "127.0.0.1:" << port;
     // Setup server
     ServerBuilder builder;
-    SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key,
-                                                        test_server1_cert};
-    SslServerCredentialsOptions ssl_opts;
-    ssl_opts.pem_root_certs = "";
-    ssl_opts.pem_key_cert_pairs.push_back(pkcp);
-    auto server_creds = SslServerCredentials(ssl_opts);
-    server_creds->SetAuthMetadataProcessor(processor);
+    auto server_creds = InsecureServerCredentials();
+    if (GetParam().use_tls) {
+      SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key,
+                                                          test_server1_cert};
+      SslServerCredentialsOptions ssl_opts;
+      ssl_opts.pem_root_certs = "";
+      ssl_opts.pem_key_cert_pairs.push_back(pkcp);
+      server_creds = SslServerCredentials(ssl_opts);
+      server_creds->SetAuthMetadataProcessor(processor);
+    }
     builder.AddListeningPort(server_address_.str(), server_creds);
     builder.RegisterService(&service_);
     builder.RegisterService("foo.test.youtube.com", &special_service_);
@@ -396,17 +407,20 @@ class End2endTest : public ::testing::TestWithParam<bool> {
       StartServer(std::shared_ptr<AuthMetadataProcessor>());
     }
     EXPECT_TRUE(is_server_started_);
-    SslCredentialsOptions ssl_opts = {test_root_cert, "", ""};
     ChannelArguments args;
-    args.SetSslTargetNameOverride("foo.test.google.fr");
+    auto channel_creds = InsecureCredentials();
+    if (GetParam().use_tls) {
+      SslCredentialsOptions ssl_opts = {test_root_cert, "", ""};
+      args.SetSslTargetNameOverride("foo.test.google.fr");
+      channel_creds = SslCredentials(ssl_opts);
+    }
     args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
-    channel_ = CreateCustomChannel(server_address_.str(),
-                                   SslCredentials(ssl_opts), args);
+    channel_ = CreateCustomChannel(server_address_.str(), channel_creds, args);
   }
 
-  void ResetStub(bool use_proxy) {
+  void ResetStub() {
     ResetChannel();
-    if (use_proxy) {
+    if (GetParam().use_proxy) {
       proxy_service_.reset(new Proxy(channel_));
       int port = grpc_pick_unused_port_or_die();
       std::ostringstream proxyaddr;
@@ -450,124 +464,8 @@ static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub,
   }
 }
 
-TEST_F(End2endTest, SimpleRpcWithHost) {
-  ResetStub(false);
-
-  EchoRequest request;
-  EchoResponse response;
-  request.set_message("Hello");
-
-  ClientContext context;
-  context.set_authority("foo.test.youtube.com");
-  Status s = stub_->Echo(&context, request, &response);
-  EXPECT_EQ(response.message(), request.message());
-  EXPECT_TRUE(response.has_param());
-  EXPECT_EQ("special", response.param().host());
-  EXPECT_TRUE(s.ok());
-}
-
-TEST_P(End2endTest, SimpleRpc) {
-  ResetStub(GetParam());
-  SendRpc(stub_.get(), 1);
-}
-
-TEST_P(End2endTest, MultipleRpcs) {
-  ResetStub(GetParam());
-  std::vector<std::thread*> threads;
-  for (int i = 0; i < 10; ++i) {
-    threads.push_back(new std::thread(SendRpc, stub_.get(), 10));
-  }
-  for (int i = 0; i < 10; ++i) {
-    threads[i]->join();
-    delete threads[i];
-  }
-}
-
-// Set a 10us deadline and make sure proper error is returned.
-TEST_P(End2endTest, RpcDeadlineExpires) {
-  ResetStub(GetParam());
-  EchoRequest request;
-  EchoResponse response;
-  request.set_message("Hello");
-
-  ClientContext context;
-  std::chrono::system_clock::time_point deadline =
-      std::chrono::system_clock::now() + std::chrono::microseconds(10);
-  context.set_deadline(deadline);
-  Status s = stub_->Echo(&context, request, &response);
-  EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code());
-}
-
-// Set a long but finite deadline.
-TEST_P(End2endTest, RpcLongDeadline) {
-  ResetStub(GetParam());
-  EchoRequest request;
-  EchoResponse response;
-  request.set_message("Hello");
-
-  ClientContext context;
-  std::chrono::system_clock::time_point deadline =
-      std::chrono::system_clock::now() + std::chrono::hours(1);
-  context.set_deadline(deadline);
-  Status s = stub_->Echo(&context, request, &response);
-  EXPECT_EQ(response.message(), request.message());
-  EXPECT_TRUE(s.ok());
-}
-
-// Ask server to echo back the deadline it sees.
-TEST_P(End2endTest, EchoDeadline) {
-  ResetStub(GetParam());
-  EchoRequest request;
-  EchoResponse response;
-  request.set_message("Hello");
-  request.mutable_param()->set_echo_deadline(true);
-
-  ClientContext context;
-  std::chrono::system_clock::time_point deadline =
-      std::chrono::system_clock::now() + std::chrono::seconds(100);
-  context.set_deadline(deadline);
-  Status s = stub_->Echo(&context, request, &response);
-  EXPECT_EQ(response.message(), request.message());
-  EXPECT_TRUE(s.ok());
-  gpr_timespec sent_deadline;
-  Timepoint2Timespec(deadline, &sent_deadline);
-  // Allow 1 second error.
-  EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 1);
-  EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1);
-}
-
-// Ask server to echo back the deadline it sees. The rpc has no deadline.
-TEST_P(End2endTest, EchoDeadlineForNoDeadlineRpc) {
-  ResetStub(GetParam());
-  EchoRequest request;
-  EchoResponse response;
-  request.set_message("Hello");
-  request.mutable_param()->set_echo_deadline(true);
-
-  ClientContext context;
-  Status s = stub_->Echo(&context, request, &response);
-  EXPECT_EQ(response.message(), request.message());
-  EXPECT_TRUE(s.ok());
-  EXPECT_EQ(response.param().request_deadline(),
-            gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec);
-}
-
-TEST_P(End2endTest, UnimplementedRpc) {
-  ResetStub(GetParam());
-  EchoRequest request;
-  EchoResponse response;
-  request.set_message("Hello");
-
-  ClientContext context;
-  Status s = stub_->Unimplemented(&context, request, &response);
-  EXPECT_FALSE(s.ok());
-  EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED);
-  EXPECT_EQ(s.error_message(), "");
-  EXPECT_EQ(response.message(), "");
-}
-
-TEST_F(End2endTest, RequestStreamOneRequest) {
-  ResetStub(false);
+TEST_P(End2endTest, RequestStreamOneRequest) {
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   ClientContext context;
@@ -581,8 +479,8 @@ TEST_F(End2endTest, RequestStreamOneRequest) {
   EXPECT_TRUE(s.ok());
 }
 
-TEST_F(End2endTest, RequestStreamTwoRequests) {
-  ResetStub(false);
+TEST_P(End2endTest, RequestStreamTwoRequests) {
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   ClientContext context;
@@ -597,8 +495,8 @@ TEST_F(End2endTest, RequestStreamTwoRequests) {
   EXPECT_TRUE(s.ok());
 }
 
-TEST_F(End2endTest, ResponseStream) {
-  ResetStub(false);
+TEST_P(End2endTest, ResponseStream) {
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   ClientContext context;
@@ -617,8 +515,8 @@ TEST_F(End2endTest, ResponseStream) {
   EXPECT_TRUE(s.ok());
 }
 
-TEST_F(End2endTest, BidiStream) {
-  ResetStub(false);
+TEST_P(End2endTest, BidiStream) {
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   ClientContext context;
@@ -650,8 +548,8 @@ TEST_F(End2endTest, BidiStream) {
 
 // Talk to the two services with the same name but different package names.
 // The two stubs are created on the same channel.
-TEST_F(End2endTest, DiffPackageServices) {
-  ResetStub(false);
+TEST_P(End2endTest, DiffPackageServices) {
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   request.set_message("Hello");
@@ -670,33 +568,6 @@ TEST_F(End2endTest, DiffPackageServices) {
   EXPECT_TRUE(s.ok());
 }
 
-// rpc and stream should fail on bad credentials.
-TEST_F(End2endTest, BadCredentials) {
-  std::shared_ptr<Credentials> bad_creds = GoogleRefreshTokenCredentials("");
-  EXPECT_EQ(static_cast<Credentials*>(nullptr), bad_creds.get());
-  std::shared_ptr<Channel> channel =
-      CreateChannel(server_address_.str(), bad_creds);
-  std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub(
-      grpc::cpp::test::util::TestService::NewStub(channel));
-  EchoRequest request;
-  EchoResponse response;
-  ClientContext context;
-  request.set_message("Hello");
-
-  Status s = stub->Echo(&context, request, &response);
-  EXPECT_EQ("", response.message());
-  EXPECT_FALSE(s.ok());
-  EXPECT_EQ(StatusCode::INVALID_ARGUMENT, s.error_code());
-  EXPECT_EQ("Invalid credentials.", s.error_message());
-
-  ClientContext context2;
-  auto stream = stub->BidiStream(&context2);
-  s = stream->Finish();
-  EXPECT_FALSE(s.ok());
-  EXPECT_EQ(StatusCode::INVALID_ARGUMENT, s.error_code());
-  EXPECT_EQ("Invalid credentials.", s.error_message());
-}
-
 void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) {
   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
                                gpr_time_from_micros(delay_us, GPR_TIMESPAN)));
@@ -705,40 +576,9 @@ void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) {
   context->TryCancel();
 }
 
-// Client cancels rpc after 10ms
-TEST_P(End2endTest, ClientCancelsRpc) {
-  ResetStub(GetParam());
-  EchoRequest request;
-  EchoResponse response;
-  request.set_message("Hello");
-  const int kCancelDelayUs = 10 * 1000;
-  request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
-
-  ClientContext context;
-  std::thread cancel_thread(CancelRpc, &context, kCancelDelayUs, &service_);
-  Status s = stub_->Echo(&context, request, &response);
-  cancel_thread.join();
-  EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
-  EXPECT_EQ(s.error_message(), "Cancelled");
-}
-
-// Server cancels rpc after 1ms
-TEST_P(End2endTest, ServerCancelsRpc) {
-  ResetStub(GetParam());
-  EchoRequest request;
-  EchoResponse response;
-  request.set_message("Hello");
-  request.mutable_param()->set_server_cancel_after_us(1000);
-
-  ClientContext context;
-  Status s = stub_->Echo(&context, request, &response);
-  EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
-  EXPECT_TRUE(s.error_message().empty());
-}
-
 // Client cancels request stream after sending two messages
-TEST_F(End2endTest, ClientCancelsRequestStream) {
-  ResetStub(false);
+TEST_P(End2endTest, ClientCancelsRequestStream) {
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   ClientContext context;
@@ -757,8 +597,8 @@ TEST_F(End2endTest, ClientCancelsRequestStream) {
 }
 
 // Client cancels server stream after sending some messages
-TEST_F(End2endTest, ClientCancelsResponseStream) {
-  ResetStub(false);
+TEST_P(End2endTest, ClientCancelsResponseStream) {
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   ClientContext context;
@@ -789,8 +629,8 @@ TEST_F(End2endTest, ClientCancelsResponseStream) {
 }
 
 // Client cancels bidi stream after sending some messages
-TEST_F(End2endTest, ClientCancelsBidi) {
-  ResetStub(false);
+TEST_P(End2endTest, ClientCancelsBidi) {
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   ClientContext context;
@@ -821,8 +661,8 @@ TEST_F(End2endTest, ClientCancelsBidi) {
   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
 }
 
-TEST_F(End2endTest, RpcMaxMessageSize) {
-  ResetStub(false);
+TEST_P(End2endTest, RpcMaxMessageSize) {
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   request.set_message(string(kMaxMessageSize_ * 2, 'a'));
@@ -832,110 +672,349 @@ TEST_F(End2endTest, RpcMaxMessageSize) {
   EXPECT_FALSE(s.ok());
 }
 
-bool MetadataContains(
-    const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
-    const grpc::string& key, const grpc::string& value) {
-  int count = 0;
+// Client sends 20 requests and the server returns CANCELLED status after
+// reading 10 requests.
+TEST_P(End2endTest, RequestStreamServerEarlyCancelTest) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  ClientContext context;
 
-  for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
-           metadata.begin();
-       iter != metadata.end(); ++iter) {
-    if (ToString(iter->first) == key && ToString(iter->second) == value) {
-      count++;
-    }
+  context.AddMetadata(kServerCancelAfterReads, "10");
+  auto stream = stub_->RequestStream(&context, &response);
+  request.set_message("hello");
+  int send_messages = 20;
+  while (send_messages > 0) {
+    EXPECT_TRUE(stream->Write(request));
+    send_messages--;
   }
-  return count == 1;
+  stream->WritesDone();
+  Status s = stream->Finish();
+  EXPECT_EQ(s.error_code(), StatusCode::CANCELLED);
 }
 
-TEST_F(End2endTest, SetPerCallCredentials) {
-  ResetStub(false);
-  EchoRequest request;
-  EchoResponse response;
-  ClientContext context;
-  std::shared_ptr<Credentials> creds =
-      GoogleIAMCredentials("fake_token", "fake_selector");
-  context.set_credentials(creds);
-  request.set_message("Hello");
-  request.mutable_param()->set_echo_metadata(true);
+namespace {
+void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
+                      gpr_event* ev) {
+  EchoResponse resp;
+  gpr_event_set(ev, (void*)1);
+  while (stream->Read(&resp)) {
+    gpr_log(GPR_INFO, "Read message");
+  }
+}
+}  // namespace
 
-  Status s = stub_->Echo(&context, request, &response);
-  EXPECT_EQ(request.message(), response.message());
+// Run a Read and a WritesDone simultaneously.
+TEST_P(End2endTest, SimultaneousReadWritesDone) {
+  ResetStub();
+  ClientContext context;
+  gpr_event ev;
+  gpr_event_init(&ev);
+  auto stream = stub_->BidiStream(&context);
+  std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev);
+  gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
+  stream->WritesDone();
+  Status s = stream->Finish();
   EXPECT_TRUE(s.ok());
-  EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
-                               GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
-                               "fake_token"));
-  EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
-                               GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
-                               "fake_selector"));
+  reader_thread.join();
 }
 
-TEST_F(End2endTest, InsecurePerCallCredentials) {
-  ResetStub(false);
-  EchoRequest request;
-  EchoResponse response;
-  ClientContext context;
-  std::shared_ptr<Credentials> creds = InsecureCredentials();
-  context.set_credentials(creds);
-  request.set_message("Hello");
-  request.mutable_param()->set_echo_metadata(true);
+TEST_P(End2endTest, ChannelState) {
+  ResetStub();
+  // Start IDLE
+  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
 
-  Status s = stub_->Echo(&context, request, &response);
-  EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
-  EXPECT_EQ("Failed to set credentials to rpc.", s.error_message());
+  // Did not ask to connect, no state change.
+  CompletionQueue cq;
+  std::chrono::system_clock::time_point deadline =
+      std::chrono::system_clock::now() + std::chrono::milliseconds(10);
+  channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, NULL);
+  void* tag;
+  bool ok = true;
+  cq.Next(&tag, &ok);
+  EXPECT_FALSE(ok);
+
+  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
+  EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE,
+                                           gpr_inf_future(GPR_CLOCK_REALTIME)));
+  EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel_->GetState(false));
 }
 
-TEST_F(End2endTest, OverridePerCallCredentials) {
-  ResetStub(false);
-  EchoRequest request;
-  EchoResponse response;
-  ClientContext context;
-  std::shared_ptr<Credentials> creds1 =
-      GoogleIAMCredentials("fake_token1", "fake_selector1");
-  context.set_credentials(creds1);
-  std::shared_ptr<Credentials> creds2 =
-      GoogleIAMCredentials("fake_token2", "fake_selector2");
-  context.set_credentials(creds2);
+// Takes 10s.
+TEST_P(End2endTest, ChannelStateTimeout) {
+  if (GetParam().use_tls) {
+    return;
+  }
+  int port = grpc_pick_unused_port_or_die();
+  std::ostringstream server_address;
+  server_address << "127.0.0.1:" << port;
+  // Channel to non-existing server
+  auto channel = CreateChannel(server_address.str(), InsecureCredentials());
+  // Start IDLE
+  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true));
+
+  auto state = GRPC_CHANNEL_IDLE;
+  for (int i = 0; i < 10; i++) {
+    channel->WaitForStateChange(
+        state, std::chrono::system_clock::now() + std::chrono::seconds(1));
+    state = channel->GetState(false);
+  }
+}
+
+// Talking to a non-existing service.
+TEST_P(End2endTest, NonExistingService) {
+  ResetChannel();
+  std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub;
+  stub = grpc::cpp::test::util::UnimplementedService::NewStub(channel_);
+
+  EchoRequest request;
+  EchoResponse response;
   request.set_message("Hello");
-  request.mutable_param()->set_echo_metadata(true);
 
+  ClientContext context;
+  Status s = stub->Unimplemented(&context, request, &response);
+  EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
+  EXPECT_EQ("", s.error_message());
+}
+
+//////////////////////////////////////////////////////////////////////////
+// Test with and without a proxy.
+class ProxyEnd2endTest : public End2endTest {
+ protected:
+};
+
+TEST_P(ProxyEnd2endTest, SimpleRpc) {
+  ResetStub();
+  SendRpc(stub_.get(), 1);
+}
+
+TEST_P(ProxyEnd2endTest, MultipleRpcs) {
+  ResetStub();
+  std::vector<std::thread*> threads;
+  for (int i = 0; i < 10; ++i) {
+    threads.push_back(new std::thread(SendRpc, stub_.get(), 10));
+  }
+  for (int i = 0; i < 10; ++i) {
+    threads[i]->join();
+    delete threads[i];
+  }
+}
+
+// Set a 10us deadline and make sure proper error is returned.
+TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  request.set_message("Hello");
+
+  ClientContext context;
+  std::chrono::system_clock::time_point deadline =
+      std::chrono::system_clock::now() + std::chrono::microseconds(10);
+  context.set_deadline(deadline);
   Status s = stub_->Echo(&context, request, &response);
-  EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
-                               GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
-                               "fake_token2"));
-  EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
-                               GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
-                               "fake_selector2"));
-  EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
-                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
-                                "fake_token1"));
-  EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
-                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
-                                "fake_selector1"));
-  EXPECT_EQ(request.message(), response.message());
+  EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code());
+}
+
+// Set a long but finite deadline.
+TEST_P(ProxyEnd2endTest, RpcLongDeadline) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  request.set_message("Hello");
+
+  ClientContext context;
+  std::chrono::system_clock::time_point deadline =
+      std::chrono::system_clock::now() + std::chrono::hours(1);
+  context.set_deadline(deadline);
+  Status s = stub_->Echo(&context, request, &response);
+  EXPECT_EQ(response.message(), request.message());
   EXPECT_TRUE(s.ok());
 }
 
-TEST_F(End2endTest, NonBlockingAuthMetadataPluginFailure) {
-  ResetStub(false);
+// Ask server to echo back the deadline it sees.
+TEST_P(ProxyEnd2endTest, EchoDeadline) {
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
+  request.set_message("Hello");
+  request.mutable_param()->set_echo_deadline(true);
+
   ClientContext context;
-  context.set_credentials(
-      MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
-          new TestMetadataCredentialsPlugin(
-              "Does not matter, will fail anyway (see 3rd param)", false,
-              false))));
+  std::chrono::system_clock::time_point deadline =
+      std::chrono::system_clock::now() + std::chrono::seconds(100);
+  context.set_deadline(deadline);
+  Status s = stub_->Echo(&context, request, &response);
+  EXPECT_EQ(response.message(), request.message());
+  EXPECT_TRUE(s.ok());
+  gpr_timespec sent_deadline;
+  Timepoint2Timespec(deadline, &sent_deadline);
+  // Allow 1 second error.
+  EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 1);
+  EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1);
+}
+
+// Ask server to echo back the deadline it sees. The rpc has no deadline.
+TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
   request.set_message("Hello");
+  request.mutable_param()->set_echo_deadline(true);
 
+  ClientContext context;
   Status s = stub_->Echo(&context, request, &response);
+  EXPECT_EQ(response.message(), request.message());
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(response.param().request_deadline(),
+            gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec);
+}
+
+TEST_P(ProxyEnd2endTest, UnimplementedRpc) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  request.set_message("Hello");
+
+  ClientContext context;
+  Status s = stub_->Unimplemented(&context, request, &response);
   EXPECT_FALSE(s.ok());
-  EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
+  EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED);
+  EXPECT_EQ(s.error_message(), "");
+  EXPECT_EQ(response.message(), "");
 }
 
-TEST_F(End2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
-  auto* processor = new TestAuthMetadataProcessor(false);
+// Client cancels rpc after 10ms
+TEST_P(ProxyEnd2endTest, ClientCancelsRpc) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  request.set_message("Hello");
+  const int kCancelDelayUs = 10 * 1000;
+  request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
+
+  ClientContext context;
+  std::thread cancel_thread(CancelRpc, &context, kCancelDelayUs, &service_);
+  Status s = stub_->Echo(&context, request, &response);
+  cancel_thread.join();
+  EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
+  EXPECT_EQ(s.error_message(), "Cancelled");
+}
+
+// Server cancels rpc after 1ms
+TEST_P(ProxyEnd2endTest, ServerCancelsRpc) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  request.set_message("Hello");
+  request.mutable_param()->set_server_cancel_after_us(1000);
+
+  ClientContext context;
+  Status s = stub_->Echo(&context, request, &response);
+  EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
+  EXPECT_TRUE(s.error_message().empty());
+}
+
+// Make the response larger than the flow control window.
+TEST_P(ProxyEnd2endTest, HugeResponse) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  request.set_message("huge response");
+  const size_t kResponseSize = 1024 * (1024 + 10);
+  request.mutable_param()->set_response_message_length(kResponseSize);
+
+  ClientContext context;
+  Status s = stub_->Echo(&context, request, &response);
+  EXPECT_EQ(kResponseSize, response.message().size());
+  EXPECT_TRUE(s.ok());
+}
+
+TEST_P(ProxyEnd2endTest, Peer) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  request.set_message("hello");
+  request.mutable_param()->set_echo_peer(true);
+
+  ClientContext context;
+  Status s = stub_->Echo(&context, request, &response);
+  EXPECT_EQ(response.message(), request.message());
+  EXPECT_TRUE(s.ok());
+  EXPECT_TRUE(CheckIsLocalhost(response.param().peer()));
+  EXPECT_TRUE(CheckIsLocalhost(context.peer()));
+}
+
+//////////////////////////////////////////////////////////////////////////
+class SecureEnd2endTest : public End2endTest {
+ protected:
+  SecureEnd2endTest() {
+    GPR_ASSERT(!GetParam().use_proxy);
+    GPR_ASSERT(GetParam().use_tls);
+  }
+};
+
+TEST_P(SecureEnd2endTest, SimpleRpcWithHost) {
+  ResetStub();
+
+  EchoRequest request;
+  EchoResponse response;
+  request.set_message("Hello");
+
+  ClientContext context;
+  context.set_authority("foo.test.youtube.com");
+  Status s = stub_->Echo(&context, request, &response);
+  EXPECT_EQ(response.message(), request.message());
+  EXPECT_TRUE(response.has_param());
+  EXPECT_EQ("special", response.param().host());
+  EXPECT_TRUE(s.ok());
+}
+
+// rpc and stream should fail on bad credentials.
+TEST_P(SecureEnd2endTest, BadCredentials) {
+  std::shared_ptr<Credentials> bad_creds = GoogleRefreshTokenCredentials("");
+  EXPECT_EQ(static_cast<Credentials*>(nullptr), bad_creds.get());
+  std::shared_ptr<Channel> channel =
+      CreateChannel(server_address_.str(), bad_creds);
+  std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub(
+      grpc::cpp::test::util::TestService::NewStub(channel));
+  EchoRequest request;
+  EchoResponse response;
+  ClientContext context;
+  request.set_message("Hello");
+
+  Status s = stub->Echo(&context, request, &response);
+  EXPECT_EQ("", response.message());
+  EXPECT_FALSE(s.ok());
+  EXPECT_EQ(StatusCode::INVALID_ARGUMENT, s.error_code());
+  EXPECT_EQ("Invalid credentials.", s.error_message());
+
+  ClientContext context2;
+  auto stream = stub->BidiStream(&context2);
+  s = stream->Finish();
+  EXPECT_FALSE(s.ok());
+  EXPECT_EQ(StatusCode::INVALID_ARGUMENT, s.error_code());
+  EXPECT_EQ("Invalid credentials.", s.error_message());
+}
+
+bool MetadataContains(
+    const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
+    const grpc::string& key, const grpc::string& value) {
+  int count = 0;
+
+  for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
+           metadata.begin();
+       iter != metadata.end(); ++iter) {
+    if (ToString(iter->first) == key && ToString(iter->second) == value) {
+      count++;
+    }
+  }
+  return count == 1;
+}
+
+TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
+  auto* processor = new TestAuthMetadataProcessor(true);
   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
-  ResetStub(false);
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   ClientContext context;
@@ -955,10 +1034,10 @@ TEST_F(End2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
       grpc::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
 }
 
-TEST_F(End2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) {
-  auto* processor = new TestAuthMetadataProcessor(false);
+TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) {
+  auto* processor = new TestAuthMetadataProcessor(true);
   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
-  ResetStub(false);
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   ClientContext context;
@@ -969,16 +1048,83 @@ TEST_F(End2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) {
   EXPECT_FALSE(s.ok());
   EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
 }
+TEST_P(SecureEnd2endTest, SetPerCallCredentials) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  ClientContext context;
+  std::shared_ptr<Credentials> creds =
+      GoogleIAMCredentials("fake_token", "fake_selector");
+  context.set_credentials(creds);
+  request.set_message("Hello");
+  request.mutable_param()->set_echo_metadata(true);
 
-TEST_F(End2endTest, BlockingAuthMetadataPluginFailure) {
-  ResetStub(false);
+  Status s = stub_->Echo(&context, request, &response);
+  EXPECT_EQ(request.message(), response.message());
+  EXPECT_TRUE(s.ok());
+  EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
+                               GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
+                               "fake_token"));
+  EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
+                               GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
+                               "fake_selector"));
+}
+
+TEST_P(SecureEnd2endTest, InsecurePerCallCredentials) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  ClientContext context;
+  std::shared_ptr<Credentials> creds = InsecureCredentials();
+  context.set_credentials(creds);
+  request.set_message("Hello");
+  request.mutable_param()->set_echo_metadata(true);
+
+  Status s = stub_->Echo(&context, request, &response);
+  EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
+  EXPECT_EQ("Failed to set credentials to rpc.", s.error_message());
+}
+
+TEST_P(SecureEnd2endTest, OverridePerCallCredentials) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  ClientContext context;
+  std::shared_ptr<Credentials> creds1 =
+      GoogleIAMCredentials("fake_token1", "fake_selector1");
+  context.set_credentials(creds1);
+  std::shared_ptr<Credentials> creds2 =
+      GoogleIAMCredentials("fake_token2", "fake_selector2");
+  context.set_credentials(creds2);
+  request.set_message("Hello");
+  request.mutable_param()->set_echo_metadata(true);
+
+  Status s = stub_->Echo(&context, request, &response);
+  EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
+                               GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
+                               "fake_token2"));
+  EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
+                               GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
+                               "fake_selector2"));
+  EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
+                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
+                                "fake_token1"));
+  EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
+                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
+                                "fake_selector1"));
+  EXPECT_EQ(request.message(), response.message());
+  EXPECT_TRUE(s.ok());
+}
+
+TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   ClientContext context;
   context.set_credentials(
       MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
           new TestMetadataCredentialsPlugin(
-              "Does not matter, will fail anyway (see 3rd param)", true,
+              "Does not matter, will fail anyway (see 3rd param)", false,
               false))));
   request.set_message("Hello");
 
@@ -987,10 +1133,10 @@ TEST_F(End2endTest, BlockingAuthMetadataPluginFailure) {
   EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
 }
 
-TEST_F(End2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
-  auto* processor = new TestAuthMetadataProcessor(true);
+TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
+  auto* processor = new TestAuthMetadataProcessor(false);
   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
-  ResetStub(false);
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   ClientContext context;
@@ -1010,10 +1156,10 @@ TEST_F(End2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
       grpc::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
 }
 
-TEST_F(End2endTest, BlockingAuthMetadataPluginAndProcessorFailure) {
-  auto* processor = new TestAuthMetadataProcessor(true);
+TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) {
+  auto* processor = new TestAuthMetadataProcessor(false);
   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
-  ResetStub(false);
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   ClientContext context;
@@ -1025,29 +1171,25 @@ TEST_F(End2endTest, BlockingAuthMetadataPluginAndProcessorFailure) {
   EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
 }
 
-// Client sends 20 requests and the server returns CANCELLED status after
-// reading 10 requests.
-TEST_F(End2endTest, RequestStreamServerEarlyCancelTest) {
-  ResetStub(false);
+TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) {
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   ClientContext context;
+  context.set_credentials(
+      MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
+          new TestMetadataCredentialsPlugin(
+              "Does not matter, will fail anyway (see 3rd param)", true,
+              false))));
+  request.set_message("Hello");
 
-  context.AddMetadata(kServerCancelAfterReads, "10");
-  auto stream = stub_->RequestStream(&context, &response);
-  request.set_message("hello");
-  int send_messages = 20;
-  while (send_messages > 0) {
-    EXPECT_TRUE(stream->Write(request));
-    send_messages--;
-  }
-  stream->WritesDone();
-  Status s = stream->Finish();
-  EXPECT_EQ(s.error_code(), StatusCode::CANCELLED);
+  Status s = stub_->Echo(&context, request, &response);
+  EXPECT_FALSE(s.ok());
+  EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
 }
 
-TEST_F(End2endTest, ClientAuthContext) {
-  ResetStub(false);
+TEST_P(SecureEnd2endTest, ClientAuthContext) {
+  ResetStub();
   EchoRequest request;
   EchoResponse response;
   request.set_message("Hello");
@@ -1072,119 +1214,20 @@ TEST_F(End2endTest, ClientAuthContext) {
   EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2]));
 }
 
-// Make the response larger than the flow control window.
-TEST_P(End2endTest, HugeResponse) {
-  ResetStub(GetParam());
-  EchoRequest request;
-  EchoResponse response;
-  request.set_message("huge response");
-  const size_t kResponseSize = 1024 * (1024 + 10);
-  request.mutable_param()->set_response_message_length(kResponseSize);
-
-  ClientContext context;
-  Status s = stub_->Echo(&context, request, &response);
-  EXPECT_EQ(kResponseSize, response.message().size());
-  EXPECT_TRUE(s.ok());
-}
-
-namespace {
-void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
-                      gpr_event* ev) {
-  EchoResponse resp;
-  gpr_event_set(ev, (void*)1);
-  while (stream->Read(&resp)) {
-    gpr_log(GPR_INFO, "Read message");
-  }
-}
-}  // namespace
-
-// Run a Read and a WritesDone simultaneously.
-TEST_F(End2endTest, SimultaneousReadWritesDone) {
-  ResetStub(false);
-  ClientContext context;
-  gpr_event ev;
-  gpr_event_init(&ev);
-  auto stream = stub_->BidiStream(&context);
-  std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev);
-  gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
-  stream->WritesDone();
-  Status s = stream->Finish();
-  EXPECT_TRUE(s.ok());
-  reader_thread.join();
-}
-
-TEST_P(End2endTest, Peer) {
-  ResetStub(GetParam());
-  EchoRequest request;
-  EchoResponse response;
-  request.set_message("hello");
-  request.mutable_param()->set_echo_peer(true);
-
-  ClientContext context;
-  Status s = stub_->Echo(&context, request, &response);
-  EXPECT_EQ(response.message(), request.message());
-  EXPECT_TRUE(s.ok());
-  EXPECT_TRUE(CheckIsLocalhost(response.param().peer()));
-  EXPECT_TRUE(CheckIsLocalhost(context.peer()));
-}
-
-TEST_F(End2endTest, ChannelState) {
-  ResetStub(false);
-  // Start IDLE
-  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
-
-  // Did not ask to connect, no state change.
-  CompletionQueue cq;
-  std::chrono::system_clock::time_point deadline =
-      std::chrono::system_clock::now() + std::chrono::milliseconds(10);
-  channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, NULL);
-  void* tag;
-  bool ok = true;
-  cq.Next(&tag, &ok);
-  EXPECT_FALSE(ok);
-
-  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
-  EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE,
-                                           gpr_inf_future(GPR_CLOCK_REALTIME)));
-  EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel_->GetState(false));
-}
-
-// Takes 10s.
-TEST_F(End2endTest, ChannelStateTimeout) {
-  int port = grpc_pick_unused_port_or_die();
-  std::ostringstream server_address;
-  server_address << "127.0.0.1:" << port;
-  // Channel to non-existing server
-  auto channel = CreateChannel(server_address.str(), InsecureCredentials());
-  // Start IDLE
-  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true));
-
-  auto state = GRPC_CHANNEL_IDLE;
-  for (int i = 0; i < 10; i++) {
-    channel->WaitForStateChange(
-        state, std::chrono::system_clock::now() + std::chrono::seconds(1));
-    state = channel->GetState(false);
-  }
-}
-
-// Talking to a non-existing service.
-TEST_F(End2endTest, NonExistingService) {
-  ResetChannel();
-  std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub;
-  stub = grpc::cpp::test::util::UnimplementedService::NewStub(channel_);
+INSTANTIATE_TEST_CASE_P(End2end, End2endTest,
+                        ::testing::Values(TestScenario(false, true),
+                                          TestScenario(false, false)));
 
-  EchoRequest request;
-  EchoResponse response;
-  request.set_message("Hello");
+INSTANTIATE_TEST_CASE_P(ProxyEnd2end, ProxyEnd2endTest,
+                        ::testing::Values(TestScenario(true, true),
+                                          TestScenario(true, false),
+                                          TestScenario(false, true),
+                                          TestScenario(false, false)));
 
-  ClientContext context;
-  Status s = stub->Unimplemented(&context, request, &response);
-  EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
-  EXPECT_EQ("", s.error_message());
-}
-
-INSTANTIATE_TEST_CASE_P(End2end, End2endTest, ::testing::Values(false, true));
+INSTANTIATE_TEST_CASE_P(SecureEnd2end, SecureEnd2endTest,
+                        ::testing::Values(TestScenario(false, true)));
 
+}  // namespace
 }  // namespace testing
 }  // namespace grpc