From 5450f05e0c789ea48ec90483a8520062b8d828c0 Mon Sep 17 00:00:00 2001
From: Nathaniel Manista <nathaniel@google.com>
Date: Thu, 11 Aug 2016 02:24:46 +0000
Subject: [PATCH] Migrate distrib, interop, and stress to GA API

---
 .../tests/interop/_insecure_interop_test.py   |  14 +-
 .../tests/interop/_secure_interop_test.py     |  24 +-
 .../grpcio_tests/tests/interop/client.py      |  56 ++--
 .../grpcio_tests/tests/interop/methods.py     | 270 +++++++++---------
 .../grpcio_tests/tests/interop/server.py      |  12 +-
 .../grpcio_tests/tests/stress/client.py       |  21 +-
 .../tests/stress/metrics_server.py            |   2 +-
 test/distrib/python/distribtest.py            |   4 +-
 8 files changed, 200 insertions(+), 203 deletions(-)

diff --git a/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py b/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py
index c753d6faf0..936c895bd2 100644
--- a/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py
@@ -29,9 +29,10 @@
 
 """Insecure client-server interoperability as a unit test."""
 
+from concurrent import futures
 import unittest
 
-from grpc.beta import implementations
+import grpc
 from src.proto.grpc.testing import test_pb2
 
 from tests.interop import _interop_test_case
@@ -44,14 +45,13 @@ class InsecureInteropTest(
     unittest.TestCase):
 
   def setUp(self):
-    self.server = test_pb2.beta_create_TestService_server(methods.TestService())
+    self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+    test_pb2.add_TestServiceServicer_to_server(
+        methods.TestService(), self.server)
     port = self.server.add_insecure_port('[::]:0')
     self.server.start()
-    self.stub = test_pb2.beta_create_TestService_stub(
-        implementations.insecure_channel('localhost', port))
-
-  def tearDown(self):
-    self.server.stop(0)
+    self.stub = test_pb2.TestServiceStub(
+        grpc.insecure_channel('localhost:{}'.format(port)))
 
 
 if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/interop/_secure_interop_test.py b/src/python/grpcio_tests/tests/interop/_secure_interop_test.py
index cb09f54a34..eaca553e1b 100644
--- a/src/python/grpcio_tests/tests/interop/_secure_interop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_secure_interop_test.py
@@ -29,17 +29,16 @@
 
 """Secure client-server interoperability as a unit test."""
 
+from concurrent import futures
 import unittest
 
-from grpc.beta import implementations
+import grpc
 from src.proto.grpc.testing import test_pb2
 
 from tests.interop import _interop_test_case
 from tests.interop import methods
 from tests.interop import resources
 
-from tests.unit.beta import test_utilities
-
 _SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
 
 
@@ -48,19 +47,18 @@ class SecureInteropTest(
     unittest.TestCase):
 
   def setUp(self):
-    self.server = test_pb2.beta_create_TestService_server(methods.TestService())
+    self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+    test_pb2.add_TestServiceServicer_to_server(
+        methods.TestService(), self.server)
     port = self.server.add_secure_port(
-        '[::]:0', implementations.ssl_server_credentials(
+        '[::]:0', grpc.ssl_server_credentials(
             [(resources.private_key(), resources.certificate_chain())]))
     self.server.start()
-    self.stub = test_pb2.beta_create_TestService_stub(
-        test_utilities.not_really_secure_channel(
-            'localhost', port, implementations.ssl_channel_credentials(
-                resources.test_root_certificates()),
-                _SERVER_HOST_OVERRIDE))
-
-  def tearDown(self):
-    self.server.stop(0)
+    self.stub = test_pb2.TestServiceStub(
+        grpc.secure_channel(
+            'localhost:{}'.format(port),
+            grpc.ssl_channel_credentials(resources.test_root_certificates()),
+            (('grpc.ssl_target_name_override', _SERVER_HOST_OVERRIDE,),)))
 
 
 if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py
index 8aa1ce30c1..9d61d18975 100644
--- a/src/python/grpcio_tests/tests/interop/client.py
+++ b/src/python/grpcio_tests/tests/interop/client.py
@@ -32,14 +32,12 @@
 import argparse
 from oauth2client import client as oauth2client_client
 
+import grpc
 from grpc.beta import implementations
 from src.proto.grpc.testing import test_pb2
 
 from tests.interop import methods
 from tests.interop import resources
-from tests.unit.beta import test_utilities
-
-_ONE_DAY_IN_SECONDS = 60 * 60 * 24
 
 
 def _args():
@@ -66,41 +64,49 @@ def _args():
   return parser.parse_args()
 
 
+def _application_default_credentials():
+  return oauth2client_client.GoogleCredentials.get_application_default()
+
+
 def _stub(args):
+  target = '{}:{}'.format(args.server_host, args.server_port)
   if args.test_case == 'oauth2_auth_token':
-    creds = oauth2client_client.GoogleCredentials.get_application_default()
-    scoped_creds = creds.create_scoped([args.oauth_scope])
-    access_token = scoped_creds.get_access_token().access_token
-    call_creds = implementations.access_token_call_credentials(access_token)
+    google_credentials = _application_default_credentials()
+    scoped_credentials = google_credentials.create_scoped([args.oauth_scope])
+    access_token = scoped_credentials.get_access_token().access_token
+    call_credentials = grpc.access_token_call_credentials(access_token)
   elif args.test_case == 'compute_engine_creds':
-    creds = oauth2client_client.GoogleCredentials.get_application_default()
-    scoped_creds = creds.create_scoped([args.oauth_scope])
-    call_creds = implementations.google_call_credentials(scoped_creds)
+    google_credentials = _application_default_credentials()
+    scoped_credentials = google_credentials.create_scoped([args.oauth_scope])
+    # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
+    # remaining use of the Beta API.
+    call_credentials = implementations.google_call_credentials(
+        scoped_credentials)
   elif args.test_case == 'jwt_token_creds':
-    creds = oauth2client_client.GoogleCredentials.get_application_default()
-    call_creds = implementations.google_call_credentials(creds)
+    google_credentials = _application_default_credentials()
+    # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
+    # remaining use of the Beta API.
+    call_credentials = implementations.google_call_credentials(
+        google_credentials)
   else:
-    call_creds = None
+    call_credentials = None
   if args.use_tls:
     if args.use_test_ca:
       root_certificates = resources.test_root_certificates()
     else:
       root_certificates = None  # will load default roots.
 
-    channel_creds = implementations.ssl_channel_credentials(root_certificates)
-    if call_creds is not None:
-      channel_creds = implementations.composite_channel_credentials(
-          channel_creds, call_creds)
+    channel_credentials = grpc.ssl_channel_credentials(root_certificates)
+    if call_credentials is not None:
+      channel_credentials = grpc.composite_channel_credentials(
+          channel_credentials, call_credentials)
 
-    channel = test_utilities.not_really_secure_channel(
-        args.server_host, args.server_port, channel_creds,
-        args.server_host_override)
-    stub = test_pb2.beta_create_TestService_stub(channel)
+    channel = grpc.secure_channel(
+        target, channel_credentials,
+        (('grpc.ssl_target_name_override', args.server_host_override,),))
   else:
-    channel = implementations.insecure_channel(
-        args.server_host, args.server_port)
-    stub = test_pb2.beta_create_TestService_stub(channel)
-  return stub
+    channel = grpc.insecure_channel(target)
+  return test_pb2.TestServiceStub(channel)
 
 
 def _test_case_from_arg(test_case_arg):
diff --git a/src/python/grpcio_tests/tests/interop/methods.py b/src/python/grpcio_tests/tests/interop/methods.py
index 97e6c9e27e..7edd75c56c 100644
--- a/src/python/grpcio_tests/tests/interop/methods.py
+++ b/src/python/grpcio_tests/tests/interop/methods.py
@@ -29,8 +29,6 @@
 
 """Implementations of interoperability test methods."""
 
-from __future__ import print_function
-
 import enum
 import json
 import os
@@ -41,26 +39,21 @@ from oauth2client import client as oauth2client_client
 
 import grpc
 from grpc.beta import implementations
-from grpc.beta import interfaces
-from grpc.framework.common import cardinality
-from grpc.framework.interfaces.face import face
 
 from src.proto.grpc.testing import empty_pb2
 from src.proto.grpc.testing import messages_pb2
 from src.proto.grpc.testing import test_pb2
 
-_TIMEOUT = 7
-
 
-class TestService(test_pb2.BetaTestServiceServicer):
+class TestService(test_pb2.TestServiceServicer):
 
   def EmptyCall(self, request, context):
     return empty_pb2.Empty()
 
   def UnaryCall(self, request, context):
     if request.HasField('response_status'):
-      context.code(request.response_status.code)
-      context.details(request.response_status.message)
+      context.set_code(request.response_status.code)
+      context.set_details(request.response_status.message)
     return messages_pb2.SimpleResponse(
         payload=messages_pb2.Payload(
             type=messages_pb2.COMPRESSABLE,
@@ -68,8 +61,8 @@ class TestService(test_pb2.BetaTestServiceServicer):
 
   def StreamingOutputCall(self, request, context):
     if request.HasField('response_status'):
-      context.code(request.response_status.code)
-      context.details(request.response_status.message)
+      context.set_code(request.response_status.code)
+      context.set_details(request.response_status.message)
     for response_parameters in request.response_parameters:
       yield messages_pb2.StreamingOutputCallResponse(
           payload=messages_pb2.Payload(
@@ -79,7 +72,7 @@ class TestService(test_pb2.BetaTestServiceServicer):
   def StreamingInputCall(self, request_iterator, context):
     aggregate_size = 0
     for request in request_iterator:
-      if request.payload and request.payload.body:
+      if request.payload is not None and request.payload.body:
         aggregate_size += len(request.payload.body)
     return messages_pb2.StreamingInputCallResponse(
         aggregated_payload_size=aggregate_size)
@@ -87,8 +80,8 @@ class TestService(test_pb2.BetaTestServiceServicer):
   def FullDuplexCall(self, request_iterator, context):
     for request in request_iterator:
       if request.HasField('response_status'):
-        context.code(request.response_status.code)
-        context.details(request.response_status.message)
+        context.set_code(request.response_status.code)
+        context.set_details(request.response_status.message)
       for response_parameters in request.response_parameters:
         yield messages_pb2.StreamingOutputCallResponse(
             payload=messages_pb2.Payload(
@@ -101,83 +94,80 @@ class TestService(test_pb2.BetaTestServiceServicer):
     return self.FullDuplexCall(request_iterator, context)
 
 
-def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope,
-                                 protocol_options=None):
-  with stub:
-    request = messages_pb2.SimpleRequest(
-        response_type=messages_pb2.COMPRESSABLE, response_size=314159,
-        payload=messages_pb2.Payload(body=b'\x00' * 271828),
-        fill_username=fill_username, fill_oauth_scope=fill_oauth_scope)
-    response_future = stub.UnaryCall.future(request, _TIMEOUT,
-                                            protocol_options=protocol_options)
-    response = response_future.result()
-    if response.payload.type is not messages_pb2.COMPRESSABLE:
-      raise ValueError(
-          'response payload type is "%s"!' % type(response.payload.type))
-    if len(response.payload.body) != 314159:
-      raise ValueError(
-          'response body of incorrect size %d!' % len(response.payload.body))
+def _large_unary_common_behavior(
+    stub, fill_username, fill_oauth_scope, call_credentials):
+  request = messages_pb2.SimpleRequest(
+      response_type=messages_pb2.COMPRESSABLE, response_size=314159,
+      payload=messages_pb2.Payload(body=b'\x00' * 271828),
+      fill_username=fill_username, fill_oauth_scope=fill_oauth_scope)
+  response_future = stub.UnaryCall.future(
+      request, credentials=call_credentials)
+  response = response_future.result()
+  if response.payload.type is not messages_pb2.COMPRESSABLE:
+    raise ValueError(
+        'response payload type is "%s"!' % type(response.payload.type))
+  elif len(response.payload.body) != 314159:
+    raise ValueError(
+        'response body of incorrect size %d!' % len(response.payload.body))
+  else:
     return response
 
 
 def _empty_unary(stub):
-  with stub:
-    response = stub.EmptyCall(empty_pb2.Empty(), _TIMEOUT)
-    if not isinstance(response, empty_pb2.Empty):
-      raise TypeError(
-          'response is of type "%s", not empty_pb2.Empty!', type(response))
+  response = stub.EmptyCall(empty_pb2.Empty())
+  if not isinstance(response, empty_pb2.Empty):
+    raise TypeError(
+        'response is of type "%s", not empty_pb2.Empty!', type(response))
 
 
 def _large_unary(stub):
-  _large_unary_common_behavior(stub, False, False)
+  _large_unary_common_behavior(stub, False, False, None)
 
 
 def _client_streaming(stub):
-  with stub:
-    payload_body_sizes = (27182, 8, 1828, 45904)
-    payloads = (
-        messages_pb2.Payload(body=b'\x00' * size)
-        for size in payload_body_sizes)
-    requests = (
-        messages_pb2.StreamingInputCallRequest(payload=payload)
-        for payload in payloads)
-    response = stub.StreamingInputCall(requests, _TIMEOUT)
-    if response.aggregated_payload_size != 74922:
-      raise ValueError(
-          'incorrect size %d!' % response.aggregated_payload_size)
+  payload_body_sizes = (27182, 8, 1828, 45904,)
+  payloads = (
+      messages_pb2.Payload(body=b'\x00' * size)
+      for size in payload_body_sizes)
+  requests = (
+      messages_pb2.StreamingInputCallRequest(payload=payload)
+      for payload in payloads)
+  response = stub.StreamingInputCall(requests)
+  if response.aggregated_payload_size != 74922:
+    raise ValueError(
+        'incorrect size %d!' % response.aggregated_payload_size)
 
 
 def _server_streaming(stub):
-  sizes = (31415, 9, 2653, 58979)
-
-  with stub:
-    request = messages_pb2.StreamingOutputCallRequest(
-        response_type=messages_pb2.COMPRESSABLE,
-        response_parameters=(
-            messages_pb2.ResponseParameters(size=sizes[0]),
-            messages_pb2.ResponseParameters(size=sizes[1]),
-            messages_pb2.ResponseParameters(size=sizes[2]),
-            messages_pb2.ResponseParameters(size=sizes[3]),
-        ))
-    response_iterator = stub.StreamingOutputCall(request, _TIMEOUT)
-    for index, response in enumerate(response_iterator):
-      if response.payload.type != messages_pb2.COMPRESSABLE:
-        raise ValueError(
-            'response body of invalid type %s!' % response.payload.type)
-      if len(response.payload.body) != sizes[index]:
-        raise ValueError(
-            'response body of invalid size %d!' % len(response.payload.body))
+  sizes = (31415, 9, 2653, 58979,)
+
+  request = messages_pb2.StreamingOutputCallRequest(
+      response_type=messages_pb2.COMPRESSABLE,
+      response_parameters=(
+          messages_pb2.ResponseParameters(size=sizes[0]),
+          messages_pb2.ResponseParameters(size=sizes[1]),
+          messages_pb2.ResponseParameters(size=sizes[2]),
+          messages_pb2.ResponseParameters(size=sizes[3]),
+      )
+  )
+  response_iterator = stub.StreamingOutputCall(request)
+  for index, response in enumerate(response_iterator):
+    if response.payload.type != messages_pb2.COMPRESSABLE:
+      raise ValueError(
+          'response body of invalid type %s!' % response.payload.type)
+    elif len(response.payload.body) != sizes[index]:
+      raise ValueError(
+          'response body of invalid size %d!' % len(response.payload.body))
 
 def _cancel_after_begin(stub):
-  with stub:
-    sizes = (27182, 8, 1828, 45904)
-    payloads = [messages_pb2.Payload(body=b'\x00' * size) for size in sizes]
-    requests = [messages_pb2.StreamingInputCallRequest(payload=payload)
-                for payload in payloads]
-    responses = stub.StreamingInputCall.future(requests, _TIMEOUT)
-    responses.cancel()
-    if not responses.cancelled():
-      raise ValueError('expected call to be cancelled')
+  sizes = (27182, 8, 1828, 45904,)
+  payloads = (messages_pb2.Payload(body=b'\x00' * size) for size in sizes)
+  requests = (messages_pb2.StreamingInputCallRequest(payload=payload)
+              for payload in payloads)
+  response_future = stub.StreamingInputCall.future(requests)
+  response_future.cancel()
+  if not response_future.cancelled():
+    raise ValueError('expected call to be cancelled')
 
 
 class _Pipe(object):
@@ -220,18 +210,17 @@ class _Pipe(object):
 
 
 def _ping_pong(stub):
-  request_response_sizes = (31415, 9, 2653, 58979)
-  request_payload_sizes = (27182, 8, 1828, 45904)
+  request_response_sizes = (31415, 9, 2653, 58979,)
+  request_payload_sizes = (27182, 8, 1828, 45904,)
 
-  with stub, _Pipe() as pipe:
-    response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
-    print('Starting ping-pong with response iterator %s' % response_iterator)
+  with _Pipe() as pipe:
+    response_iterator = stub.FullDuplexCall(pipe)
     for response_size, payload_size in zip(
         request_response_sizes, request_payload_sizes):
       request = messages_pb2.StreamingOutputCallRequest(
           response_type=messages_pb2.COMPRESSABLE,
-          response_parameters=(messages_pb2.ResponseParameters(
-              size=response_size),),
+          response_parameters=(
+              messages_pb2.ResponseParameters(size=response_size),),
           payload=messages_pb2.Payload(body=b'\x00' * payload_size))
       pipe.add(request)
       response = next(response_iterator)
@@ -244,17 +233,17 @@ def _ping_pong(stub):
 
 
 def _cancel_after_first_response(stub):
-  request_response_sizes = (31415, 9, 2653, 58979)
-  request_payload_sizes = (27182, 8, 1828, 45904)
-  with stub, _Pipe() as pipe:
-    response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
+  request_response_sizes = (31415, 9, 2653, 58979,)
+  request_payload_sizes = (27182, 8, 1828, 45904,)
+  with _Pipe() as pipe:
+    response_iterator = stub.FullDuplexCall(pipe)
 
     response_size = request_response_sizes[0]
     payload_size = request_payload_sizes[0]
     request = messages_pb2.StreamingOutputCallRequest(
         response_type=messages_pb2.COMPRESSABLE,
-        response_parameters=(messages_pb2.ResponseParameters(
-            size=response_size),),
+        response_parameters=(
+            messages_pb2.ResponseParameters(size=response_size),),
         payload=messages_pb2.Payload(body=b'\x00' * payload_size))
     pipe.add(request)
     response = next(response_iterator)
@@ -264,16 +253,17 @@ def _cancel_after_first_response(stub):
 
     try:
       next(response_iterator)
-    except Exception:
-      pass
+    except grpc.RpcError as rpc_error:
+      if rpc_error.code() is not grpc.StatusCode.CANCELLED:
+        raise
     else:
       raise ValueError('expected call to be cancelled')
 
 
 def _timeout_on_sleeping_server(stub):
   request_payload_size = 27182
-  with stub, _Pipe() as pipe:
-    response_iterator = stub.FullDuplexCall(pipe, 0.001)
+  with _Pipe() as pipe:
+    response_iterator = stub.FullDuplexCall(pipe, timeout=0.001)
 
     request = messages_pb2.StreamingOutputCallRequest(
         response_type=messages_pb2.COMPRESSABLE,
@@ -282,15 +272,16 @@ def _timeout_on_sleeping_server(stub):
     time.sleep(0.1)
     try:
       next(response_iterator)
-    except face.ExpirationError:
-      pass
+    except grpc.RpcError as rpc_error:
+      if rpc_error.code() is not grpc.StatusCode.DEADLINE_EXCEEDED:
+        raise
     else:
       raise ValueError('expected call to exceed deadline')
 
 
 def _empty_stream(stub):
-  with stub, _Pipe() as pipe:
-    response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
+  with _Pipe() as pipe:
+    response_iterator = stub.FullDuplexCall(pipe)
     pipe.close()
     try:
       next(response_iterator)
@@ -300,65 +291,64 @@ def _empty_stream(stub):
 
 
 def _status_code_and_message(stub):
-  with stub:
-    message = 'test status message'
-    code = 2
-    status = grpc.StatusCode.UNKNOWN # code = 2
-    request = messages_pb2.SimpleRequest(
-        response_type=messages_pb2.COMPRESSABLE,
-        response_size=1,
-        payload=messages_pb2.Payload(body=b'\x00'),
-        response_status=messages_pb2.EchoStatus(code=code, message=message)
-        )
-    response_future = stub.UnaryCall.future(request, _TIMEOUT)
-    if response_future.code() != status:
-      raise ValueError(
-        'expected code %s, got %s' % (status, response_future.code()))
-    if response_future.details() != message:
-      raise ValueError(
-        'expected message %s, got %s' % (message, response_future.details()))
-
-    request = messages_pb2.StreamingOutputCallRequest(
-        response_type=messages_pb2.COMPRESSABLE,
-        response_parameters=(
-            messages_pb2.ResponseParameters(size=1),),
-        response_status=messages_pb2.EchoStatus(code=code, message=message))
-    response_iterator = stub.StreamingOutputCall(request, _TIMEOUT)
-    if response_future.code() != status:
-      raise ValueError(
-        'expected code %s, got %s' % (status, response_iterator.code()))
-    if response_future.details() != message:
-      raise ValueError(
-        'expected message %s, got %s' % (message, response_iterator.details()))
+  message = 'test status message'
+  code = 2
+  status = grpc.StatusCode.UNKNOWN # code = 2
+  request = messages_pb2.SimpleRequest(
+      response_type=messages_pb2.COMPRESSABLE,
+      response_size=1,
+      payload=messages_pb2.Payload(body=b'\x00'),
+      response_status=messages_pb2.EchoStatus(code=code, message=message)
+  )
+  response_future = stub.UnaryCall.future(request)
+  if response_future.code() != status:
+    raise ValueError(
+      'expected code %s, got %s' % (status, response_future.code()))
+  elif response_future.details() != message:
+    raise ValueError(
+      'expected message %s, got %s' % (message, response_future.details()))
+
+  request = messages_pb2.StreamingOutputCallRequest(
+      response_type=messages_pb2.COMPRESSABLE,
+      response_parameters=(
+          messages_pb2.ResponseParameters(size=1),),
+      response_status=messages_pb2.EchoStatus(code=code, message=message))
+  response_iterator = stub.StreamingOutputCall(request)
+  if response_future.code() != status:
+    raise ValueError(
+      'expected code %s, got %s' % (status, response_iterator.code()))
+  elif response_future.details() != message:
+    raise ValueError(
+      'expected message %s, got %s' % (message, response_iterator.details()))
 
 
 def _compute_engine_creds(stub, args):
-  response = _large_unary_common_behavior(stub, True, True)
+  response = _large_unary_common_behavior(stub, True, True, None)
   if args.default_service_account != response.username:
     raise ValueError(
-        'expected username %s, got %s' % (args.default_service_account,
-                                          response.username))
+        'expected username %s, got %s' % (
+            args.default_service_account, response.username))
 
 
 def _oauth2_auth_token(stub, args):
   json_key_filename = os.environ[
       oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
   wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
-  response = _large_unary_common_behavior(stub, True, True)
+  response = _large_unary_common_behavior(stub, True, True, None)
   if wanted_email != response.username:
     raise ValueError(
         'expected username %s, got %s' % (wanted_email, response.username))
   if args.oauth_scope.find(response.oauth_scope) == -1:
     raise ValueError(
-        'expected to find oauth scope "%s" in received "%s"' %
-        (response.oauth_scope, args.oauth_scope))
+        'expected to find oauth scope "{}" in received "{}"'.format(
+            response.oauth_scope, args.oauth_scope))
 
 
 def _jwt_token_creds(stub, args):
   json_key_filename = os.environ[
       oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
   wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
-  response = _large_unary_common_behavior(stub, True, False)
+  response = _large_unary_common_behavior(stub, True, False, None)
   if wanted_email != response.username:
     raise ValueError(
         'expected username %s, got %s' % (wanted_email, response.username))
@@ -370,11 +360,11 @@ def _per_rpc_creds(stub, args):
   wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
   credentials = oauth2client_client.GoogleCredentials.get_application_default()
   scoped_credentials = credentials.create_scoped([args.oauth_scope])
-  call_creds = implementations.google_call_credentials(scoped_credentials)
-  options = interfaces.grpc_call_options(disable_compression=False,
-                                         credentials=call_creds)
-  response = _large_unary_common_behavior(stub, True, False,
-                                          protocol_options=options)
+  # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
+  # remaining use of the Beta API.
+  call_credentials = implementations.google_call_credentials(
+      scoped_credentials)
+  response = _large_unary_common_behavior(stub, True, False, call_credentials)
   if wanted_email != response.username:
     raise ValueError(
         'expected username %s, got %s' % (wanted_email, response.username))
diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py
index ab2c3c708f..1ae83bc57d 100644
--- a/src/python/grpcio_tests/tests/interop/server.py
+++ b/src/python/grpcio_tests/tests/interop/server.py
@@ -30,10 +30,11 @@
 """The Python implementation of the GRPC interoperability test server."""
 
 import argparse
+from concurrent import futures
 import logging
 import time
 
-from grpc.beta import implementations
+import grpc
 from src.proto.grpc.testing import test_pb2
 
 from tests.interop import methods
@@ -51,12 +52,13 @@ def serve():
       default=False, type=resources.parse_bool)
   args = parser.parse_args()
 
-  server = test_pb2.beta_create_TestService_server(methods.TestService())
+  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+  test_pb2.add_TestServiceServicer_to_server(methods.TestService(), server)
   if args.use_tls:
     private_key = resources.private_key()
     certificate_chain = resources.certificate_chain()
-    credentials = implementations.ssl_server_credentials(
-        [(private_key, certificate_chain)])
+    credentials = grpc.ssl_server_credentials(
+        ((private_key, certificate_chain),))
     server.add_secure_port('[::]:{}'.format(args.port), credentials)
   else:
     server.add_insecure_port('[::]:{}'.format(args.port))
@@ -68,7 +70,7 @@ def serve():
       time.sleep(_ONE_DAY_IN_SECONDS)
   except BaseException as e:
     logging.info('Caught exception "%s"; stopping server...', e)
-    server.stop(0)
+    server.stop(None)
     logging.info('Server stopped; exiting.')
 
 if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/stress/client.py b/src/python/grpcio_tests/tests/stress/client.py
index 0de2532cd8..975f33b4c1 100644
--- a/src/python/grpcio_tests/tests/stress/client.py
+++ b/src/python/grpcio_tests/tests/stress/client.py
@@ -30,9 +30,10 @@
 """Entry point for running stress tests."""
 
 import argparse
+from concurrent import futures
 import threading
 
-from grpc.beta import implementations
+import grpc
 from six.moves import queue
 from src.proto.grpc.testing import metrics_pb2
 from src.proto.grpc.testing import test_pb2
@@ -92,24 +93,24 @@ def _parse_weighted_test_cases(test_case_args):
 
 def run_test(args):
   test_cases = _parse_weighted_test_cases(args.test_cases)
-  test_servers = args.server_addresses.split(',')
+  test_server_targets = args.server_addresses.split(',')
   # Propagate any client exceptions with a queue
   exception_queue = queue.Queue()
   stop_event = threading.Event()
   hist = histogram.Histogram(1, 1)
   runners = []
 
-  server = metrics_pb2.beta_create_MetricsService_server(
-      metrics_server.MetricsServer(hist))
+  server = grpc.server(futures.ThreadPoolExecutor(max_workers=25))
+  metrics_pb2.add_MetricsServiceServicer_to_server(
+      metrics_server.MetricsServer(hist), server)
   server.add_insecure_port('[::]:{}'.format(args.metrics_port))
   server.start()
 
-  for test_server in test_servers:
-    host, port = test_server.split(':', 1)
+  for test_server_target in test_server_targets:
     for _ in xrange(args.num_channels_per_server):
-      channel = implementations.insecure_channel(host, int(port))
+      channel = grpc.insecure_channel(test_server_target)
       for _ in xrange(args.num_stubs_per_channel):
-        stub = test_pb2.beta_create_TestService_stub(channel)
+        stub = test_pb2.TestServiceStub(channel)
         runner = test_runner.TestRunner(stub, test_cases, hist,
                                         exception_queue, stop_event)
         runners.append(runner)
@@ -128,8 +129,8 @@ def run_test(args):
     stop_event.set()
     for runner in runners:
       runner.join()
-      runner = None
-    server.stop(0)
+    runner = None
+    server.stop(None)
 
 if __name__ == '__main__':
   run_test(_args())
diff --git a/src/python/grpcio_tests/tests/stress/metrics_server.py b/src/python/grpcio_tests/tests/stress/metrics_server.py
index b994e4643e..33dd1d6f2a 100644
--- a/src/python/grpcio_tests/tests/stress/metrics_server.py
+++ b/src/python/grpcio_tests/tests/stress/metrics_server.py
@@ -36,7 +36,7 @@ from src.proto.grpc.testing import metrics_pb2
 GAUGE_NAME = 'python_overall_qps'
 
 
-class MetricsServer(metrics_pb2.BetaMetricsServiceServicer):
+class MetricsServer(metrics_pb2.MetricsServiceServicer):
 
   def __init__(self, histogram):
     self._start_time = time.time()
diff --git a/test/distrib/python/distribtest.py b/test/distrib/python/distribtest.py
index dc20688140..0125ee6a56 100644
--- a/test/distrib/python/distribtest.py
+++ b/test/distrib/python/distribtest.py
@@ -27,10 +27,10 @@
 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-from grpc.beta import implementations
+import grpc
 
 # This code doesn't do much but makes sure the native extension is loaded
 # which is what we are testing here.
-channel = implementations.insecure_channel('localhost', 1000)
+channel = grpc.insecure_channel('localhost:1000')
 del channel
 print 'Success!'
-- 
GitLab