diff --git a/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
index 3dc3042e38bff39f3e5258f9ef4226c021f6dc5a..7466f880597a388476936b0466c996590c83be96 100644
--- a/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
+++ b/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
@@ -59,11 +59,12 @@ STUB_FACTORY_IDENTIFIER = 'beta_create_TestService_stub'
 
 class _ServicerMethods(object):
 
-  def __init__(self, test_pb2):
+  def __init__(self, response_pb2, payload_pb2):
     self._condition = threading.Condition()
     self._paused = False
     self._fail = False
-    self._test_pb2 = test_pb2
+    self._response_pb2 = response_pb2
+    self._payload_pb2 = payload_pb2
 
   @contextlib.contextmanager
   def pause(self):  # pylint: disable=invalid-name
@@ -90,22 +91,22 @@ class _ServicerMethods(object):
         self._condition.wait()
 
   def UnaryCall(self, request, unused_rpc_context):
-    response = self._test_pb2.SimpleResponse()
-    response.payload.payload_type = self._test_pb2.COMPRESSABLE
+    response = self._response_pb2.SimpleResponse()
+    response.payload.payload_type = self._payload_pb2.COMPRESSABLE
     response.payload.payload_compressable = 'a' * request.response_size
     self._control()
     return response
 
   def StreamingOutputCall(self, request, unused_rpc_context):
     for parameter in request.response_parameters:
-      response = self._test_pb2.StreamingOutputCallResponse()
-      response.payload.payload_type = self._test_pb2.COMPRESSABLE
+      response = self._response_pb2.StreamingOutputCallResponse()
+      response.payload.payload_type = self._payload_pb2.COMPRESSABLE
       response.payload.payload_compressable = 'a' * parameter.size
       self._control()
       yield response
 
   def StreamingInputCall(self, request_iter, unused_rpc_context):
-    response = self._test_pb2.StreamingInputCallResponse()
+    response = self._response_pb2.StreamingInputCallResponse()
     aggregated_payload_size = 0
     for request in request_iter:
       aggregated_payload_size += len(request.payload.payload_compressable)
@@ -116,8 +117,8 @@ class _ServicerMethods(object):
   def FullDuplexCall(self, request_iter, unused_rpc_context):
     for request in request_iter:
       for parameter in request.response_parameters:
-        response = self._test_pb2.StreamingOutputCallResponse()
-        response.payload.payload_type = self._test_pb2.COMPRESSABLE
+        response = self._response_pb2.StreamingOutputCallResponse()
+        response.payload.payload_type = self._payload_pb2.COMPRESSABLE
         response.payload.payload_compressable = 'a' * parameter.size
         self._control()
         yield response
@@ -126,8 +127,8 @@ class _ServicerMethods(object):
     responses = []
     for request in request_iter:
       for parameter in request.response_parameters:
-        response = self._test_pb2.StreamingOutputCallResponse()
-        response.payload.payload_type = self._test_pb2.COMPRESSABLE
+        response = self._response_pb2.StreamingOutputCallResponse()
+        response.payload.payload_type = self._payload_pb2.COMPRESSABLE
         response.payload.payload_compressable = 'a' * parameter.size
         self._control()
         responses.append(response)
@@ -136,23 +137,25 @@ class _ServicerMethods(object):
 
 
 @contextlib.contextmanager
-def _CreateService(test_pb2):
+def _CreateService(service_pb2, response_pb2, payload_pb2):
   """Provides a servicer backend and a stub.
 
   The servicer is just the implementation of the actual servicer passed to the
   face player of the python RPC implementation; the two are detached.
 
   Args:
-    test_pb2: The test_pb2 module generated by this test.
+    service_pb2: The service_pb2 module generated by this test.
+    response_pb2: The response_pb2 module generated by this test
+    payload_pb2: The payload_pb2 module generated by this test
 
   Yields:
     A (servicer_methods, stub) pair where servicer_methods is the back-end of
       the service bound to the stub and and stub is the stub on which to invoke
       RPCs.
   """
-  servicer_methods = _ServicerMethods(test_pb2)
+  servicer_methods = _ServicerMethods(response_pb2, payload_pb2)
 
-  class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
+  class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
 
     def UnaryCall(self, request, context):
       return servicer_methods.UnaryCall(request, context)
@@ -170,55 +173,52 @@ def _CreateService(test_pb2):
       return servicer_methods.HalfDuplexCall(request_iter, context)
 
   servicer = Servicer()
-  server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
+  server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
   port = server.add_insecure_port('[::]:0')
   server.start()
   channel = implementations.insecure_channel('localhost', port)
-  stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)(channel)
-  yield servicer_methods, stub
+  stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
+  yield (servicer_methods, stub)
   server.stop(0)
 
 
 @contextlib.contextmanager
-def _CreateIncompleteService(test_pb2):
+def _CreateIncompleteService(service_pb2):
   """Provides a servicer backend that fails to implement methods and its stub.
 
   The servicer is just the implementation of the actual servicer passed to the
   face player of the python RPC implementation; the two are detached.
-
   Args:
-    test_pb2: The test_pb2 module generated by this test.
-
+    service_pb2: The service_pb2 module generated by this test.
   Yields:
     A (servicer_methods, stub) pair where servicer_methods is the back-end of
       the service bound to the stub and and stub is the stub on which to invoke
       RPCs.
   """
-  servicer_methods = _ServicerMethods(test_pb2)
 
-  class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
+  class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
     pass
 
   servicer = Servicer()
-  server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
+  server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
   port = server.add_insecure_port('[::]:0')
   server.start()
   channel = implementations.insecure_channel('localhost', port)
-  stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)(channel)
-  yield servicer_methods, stub
+  stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
+  yield None, stub
   server.stop(0)
 
 
-def _streaming_input_request_iterator(test_pb2):
+def _streaming_input_request_iterator(request_pb2, payload_pb2):
   for _ in range(3):
-    request = test_pb2.StreamingInputCallRequest()
-    request.payload.payload_type = test_pb2.COMPRESSABLE
+    request = request_pb2.StreamingInputCallRequest()
+    request.payload.payload_type = payload_pb2.COMPRESSABLE
     request.payload.payload_compressable = 'a'
     yield request
 
 
-def _streaming_output_request(test_pb2):
-  request = test_pb2.StreamingOutputCallRequest()
+def _streaming_output_request(request_pb2):
+  request = request_pb2.StreamingOutputCallRequest()
   sizes = [1, 2, 3]
   request.response_parameters.add(size=sizes[0], interval_us=0)
   request.response_parameters.add(size=sizes[1], interval_us=0)
@@ -226,11 +226,11 @@ def _streaming_output_request(test_pb2):
   return request
 
 
-def _full_duplex_request_iterator(test_pb2):
-  request = test_pb2.StreamingOutputCallRequest()
+def _full_duplex_request_iterator(request_pb2):
+  request = request_pb2.StreamingOutputCallRequest()
   request.response_parameters.add(size=1, interval_us=0)
   yield request
-  request = test_pb2.StreamingOutputCallRequest()
+  request = request_pb2.StreamingOutputCallRequest()
   request.response_parameters.add(size=2, interval_us=0)
   request.response_parameters.add(size=3, interval_us=0)
   yield request
@@ -250,8 +250,6 @@ class PythonPluginTest(unittest.TestCase):
     protoc_command = 'protoc'
     protoc_plugin_filename = distutils.spawn.find_executable(
         'grpc_python_plugin')
-    test_proto_filename = pkg_resources.resource_filename(
-        'tests.protoc_plugin', 'protoc_plugin_test.proto')
     if not os.path.isfile(protoc_command):
       # Assume that if we haven't built protoc that it's on the system.
       protoc_command = 'protoc'
@@ -259,19 +257,44 @@ class PythonPluginTest(unittest.TestCase):
     # Ensure that the output directory exists.
     self.outdir = tempfile.mkdtemp()
 
+    # Find all proto files
+    paths = []
+    root_dir = os.path.dirname(os.path.realpath(__file__))
+    proto_dir = os.path.join(root_dir, 'protos')
+    for walk_root, _, filenames in os.walk(proto_dir):
+      for filename in filenames:
+        if filename.endswith('.proto'):
+          path = os.path.join(walk_root, filename)
+          paths.append(path)
+
     # Invoke protoc with the plugin.
     cmd = [
         protoc_command,
         '--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
-        '-I .',
+        '-I %s' % root_dir,
         '--python_out=%s' % self.outdir,
-        '--python-grpc_out=%s' % self.outdir,
-        os.path.basename(test_proto_filename),
-    ]
+        '--python-grpc_out=%s' % self.outdir
+    ] + paths
     subprocess.check_call(' '.join(cmd), shell=True, env=os.environ,
-                          cwd=os.path.dirname(test_proto_filename))
+                          cwd=os.path.dirname(os.path.realpath(__file__)))
+
+    # Generated proto directories dont include __init__.py, but
+    # these are needed for python package resolution
+    for walk_root, _, _ in os.walk(os.path.join(self.outdir, 'protos')):
+      path = os.path.join(walk_root, '__init__.py')
+      open(path, 'a').close()
+
     sys.path.insert(0, self.outdir)
 
+    import protos.payload.test_payload_pb2 as payload_pb2  # pylint: disable=g-import-not-at-top
+    import protos.requests.r.test_requests_pb2 as request_pb2  # pylint: disable=g-import-not-at-top
+    import protos.responses.test_responses_pb2 as response_pb2  # pylint: disable=g-import-not-at-top
+    import protos.service.test_service_pb2 as service_pb2  # pylint: disable=g-import-not-at-top
+    self._payload_pb2 = payload_pb2
+    self._request_pb2 = request_pb2
+    self._response_pb2 = response_pb2
+    self._service_pb2 = service_pb2
+
   def tearDown(self):
     try:
       shutil.rmtree(self.outdir)
@@ -282,43 +305,40 @@ class PythonPluginTest(unittest.TestCase):
 
   def testImportAttributes(self):
     # check that we can access the generated module and its members.
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None))
-    self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None))
-    self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None))
-    self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None))
+    self.assertIsNotNone(
+        getattr(self._service_pb2, SERVICER_IDENTIFIER, None))
+    self.assertIsNotNone(
+        getattr(self._service_pb2, STUB_IDENTIFIER, None))
+    self.assertIsNotNone(
+        getattr(self._service_pb2, SERVER_FACTORY_IDENTIFIER, None))
+    self.assertIsNotNone(
+        getattr(self._service_pb2, STUB_FACTORY_IDENTIFIER, None))
 
   def testUpDown(self):
-    import protoc_plugin_test_pb2 as test_pb2
-    moves.reload_module(test_pb2)
-    with _CreateService(test_pb2) as (servicer, stub):
-      request = test_pb2.SimpleRequest(response_size=13)
+    with _CreateService(
+        self._service_pb2, self._response_pb2, self._payload_pb2):
+      self._request_pb2.SimpleRequest(response_size=13)
 
   def testIncompleteServicer(self):
-    import protoc_plugin_test_pb2 as test_pb2
-    moves.reload_module(test_pb2)
-    with _CreateIncompleteService(test_pb2) as (servicer, stub):
-      request = test_pb2.SimpleRequest(response_size=13)
+    with _CreateIncompleteService(self._service_pb2) as (_, stub):
+      request = self._request_pb2.SimpleRequest(response_size=13)
       try:
-        response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
+        stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
       except face.AbortionError as error:
         self.assertEqual(interfaces.StatusCode.UNIMPLEMENTED, error.code)
 
   def testUnaryCall(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
-      request = test_pb2.SimpleRequest(response_size=13)
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
+      request = self._request_pb2.SimpleRequest(response_size=13)
       response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
     expected_response = methods.UnaryCall(request, 'not a real context!')
     self.assertEqual(expected_response, response)
 
   def testUnaryCallFuture(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    request = test_pb2.SimpleRequest(response_size=13)
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
+      request = self._request_pb2.SimpleRequest(response_size=13)
       # Check that the call does not block waiting for the server to respond.
       with methods.pause():
         response_future = stub.UnaryCall.future(
@@ -328,10 +348,9 @@ class PythonPluginTest(unittest.TestCase):
     self.assertEqual(expected_response, response)
 
   def testUnaryCallFutureExpired(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
-      request = test_pb2.SimpleRequest(response_size=13)
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
+      request = self._request_pb2.SimpleRequest(response_size=13)
       with methods.pause():
         response_future = stub.UnaryCall.future(
             request, test_constants.SHORT_TIMEOUT)
@@ -339,30 +358,27 @@ class PythonPluginTest(unittest.TestCase):
           response_future.result()
 
   def testUnaryCallFutureCancelled(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    request = test_pb2.SimpleRequest(response_size=13)
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
+      request = self._request_pb2.SimpleRequest(response_size=13)
       with methods.pause():
         response_future = stub.UnaryCall.future(request, 1)
         response_future.cancel()
         self.assertTrue(response_future.cancelled())
 
   def testUnaryCallFutureFailed(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    request = test_pb2.SimpleRequest(response_size=13)
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
+      request = self._request_pb2.SimpleRequest(response_size=13)
       with methods.fail():
         response_future = stub.UnaryCall.future(
             request, test_constants.LONG_TIMEOUT)
         self.assertIsNotNone(response_future.exception())
 
   def testStreamingOutputCall(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    request = _streaming_output_request(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
+      request = _streaming_output_request(self._request_pb2)
       responses = stub.StreamingOutputCall(
           request, test_constants.LONG_TIMEOUT)
       expected_responses = methods.StreamingOutputCall(
@@ -372,10 +388,9 @@ class PythonPluginTest(unittest.TestCase):
         self.assertEqual(expected_response, response)
 
   def testStreamingOutputCallExpired(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    request = _streaming_output_request(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
+      request = _streaming_output_request(self._request_pb2)
       with methods.pause():
         responses = stub.StreamingOutputCall(
             request, test_constants.SHORT_TIMEOUT)
@@ -383,10 +398,9 @@ class PythonPluginTest(unittest.TestCase):
           list(responses)
 
   def testStreamingOutputCallCancelled(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    request = _streaming_output_request(test_pb2)
-    with _CreateService(test_pb2) as (unused_methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
+      request = _streaming_output_request(self._request_pb2)
       responses = stub.StreamingOutputCall(
           request, test_constants.LONG_TIMEOUT)
       next(responses)
@@ -395,10 +409,9 @@ class PythonPluginTest(unittest.TestCase):
         next(responses)
 
   def testStreamingOutputCallFailed(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    request = _streaming_output_request(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
+      request = _streaming_output_request(self._request_pb2)
       with methods.fail():
         responses = stub.StreamingOutputCall(request, 1)
         self.assertIsNotNone(responses)
@@ -406,36 +419,38 @@ class PythonPluginTest(unittest.TestCase):
           next(responses)
 
   def testStreamingInputCall(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
       response = stub.StreamingInputCall(
-          _streaming_input_request_iterator(test_pb2),
+          _streaming_input_request_iterator(
+              self._request_pb2, self._payload_pb2),
           test_constants.LONG_TIMEOUT)
     expected_response = methods.StreamingInputCall(
-        _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+        _streaming_input_request_iterator(self._request_pb2, self._payload_pb2),
+        'not a real RpcContext!')
     self.assertEqual(expected_response, response)
 
   def testStreamingInputCallFuture(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
       with methods.pause():
         response_future = stub.StreamingInputCall.future(
-            _streaming_input_request_iterator(test_pb2),
+            _streaming_input_request_iterator(
+                self._request_pb2, self._payload_pb2),
             test_constants.LONG_TIMEOUT)
       response = response_future.result()
     expected_response = methods.StreamingInputCall(
-        _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+        _streaming_input_request_iterator(self._request_pb2, self._payload_pb2),
+        'not a real RpcContext!')
     self.assertEqual(expected_response, response)
 
   def testStreamingInputCallFutureExpired(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
       with methods.pause():
         response_future = stub.StreamingInputCall.future(
-            _streaming_input_request_iterator(test_pb2),
+            _streaming_input_request_iterator(
+                self._request_pb2, self._payload_pb2),
             test_constants.SHORT_TIMEOUT)
         with self.assertRaises(face.ExpirationError):
           response_future.result()
@@ -443,12 +458,12 @@ class PythonPluginTest(unittest.TestCase):
             response_future.exception(), face.ExpirationError)
 
   def testStreamingInputCallFutureCancelled(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
       with methods.pause():
         response_future = stub.StreamingInputCall.future(
-            _streaming_input_request_iterator(test_pb2),
+            _streaming_input_request_iterator(
+                self._request_pb2, self._payload_pb2),
             test_constants.LONG_TIMEOUT)
         response_future.cancel()
         self.assertTrue(response_future.cancelled())
@@ -456,32 +471,32 @@ class PythonPluginTest(unittest.TestCase):
         response_future.result()
 
   def testStreamingInputCallFutureFailed(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
       with methods.fail():
         response_future = stub.StreamingInputCall.future(
-            _streaming_input_request_iterator(test_pb2),
+            _streaming_input_request_iterator(
+                self._request_pb2, self._payload_pb2),
             test_constants.LONG_TIMEOUT)
         self.assertIsNotNone(response_future.exception())
 
   def testFullDuplexCall(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
       responses = stub.FullDuplexCall(
-          _full_duplex_request_iterator(test_pb2), test_constants.LONG_TIMEOUT)
+          _full_duplex_request_iterator(self._request_pb2),
+          test_constants.LONG_TIMEOUT)
       expected_responses = methods.FullDuplexCall(
-          _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
+          _full_duplex_request_iterator(self._request_pb2),
+          'not a real RpcContext!')
       for expected_response, response in moves.zip_longest(
           expected_responses, responses):
         self.assertEqual(expected_response, response)
 
   def testFullDuplexCallExpired(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    request_iterator = _full_duplex_request_iterator(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
+    request_iterator = _full_duplex_request_iterator(self._request_pb2)
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
       with methods.pause():
         responses = stub.FullDuplexCall(
             request_iterator, test_constants.SHORT_TIMEOUT)
@@ -489,10 +504,9 @@ class PythonPluginTest(unittest.TestCase):
           list(responses)
 
   def testFullDuplexCallCancelled(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
-      request_iterator = _full_duplex_request_iterator(test_pb2)
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
+      request_iterator = _full_duplex_request_iterator(self._request_pb2)
       responses = stub.FullDuplexCall(
           request_iterator, test_constants.LONG_TIMEOUT)
       next(responses)
@@ -501,10 +515,9 @@ class PythonPluginTest(unittest.TestCase):
         next(responses)
 
   def testFullDuplexCallFailed(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    request_iterator = _full_duplex_request_iterator(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
+    request_iterator = _full_duplex_request_iterator(self._request_pb2)
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
       with methods.fail():
         responses = stub.FullDuplexCall(
             request_iterator, test_constants.LONG_TIMEOUT)
@@ -513,14 +526,13 @@ class PythonPluginTest(unittest.TestCase):
           next(responses)
 
   def testHalfDuplexCall(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
       def half_duplex_request_iterator():
-        request = test_pb2.StreamingOutputCallRequest()
+        request = self._request_pb2.StreamingOutputCallRequest()
         request.response_parameters.add(size=1, interval_us=0)
         yield request
-        request = test_pb2.StreamingOutputCallRequest()
+        request = self._request_pb2.StreamingOutputCallRequest()
         request.response_parameters.add(size=2, interval_us=0)
         request.response_parameters.add(size=3, interval_us=0)
         yield request
@@ -533,8 +545,6 @@ class PythonPluginTest(unittest.TestCase):
         self.assertEqual(expected_response, response)
 
   def testHalfDuplexCallWedged(self):
-    import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    moves.reload_module(test_pb2)
     condition = threading.Condition()
     wait_cell = [False]
     @contextlib.contextmanager
@@ -547,13 +557,14 @@ class PythonPluginTest(unittest.TestCase):
         wait_cell[0] = False
         condition.notify_all()
     def half_duplex_request_iterator():
-      request = test_pb2.StreamingOutputCallRequest()
+      request = self._request_pb2.StreamingOutputCallRequest()
       request.response_parameters.add(size=1, interval_us=0)
       yield request
       with condition:
         while wait_cell[0]:
           condition.wait()
-    with _CreateService(test_pb2) as (methods, stub):
+    with _CreateService(self._service_pb2, self._response_pb2,
+                        self._payload_pb2) as (methods, stub):
       with wait():
         responses = stub.HalfDuplexCall(
             half_duplex_request_iterator(), test_constants.SHORT_TIMEOUT)
@@ -563,5 +574,5 @@ class PythonPluginTest(unittest.TestCase):
 
 
 if __name__ == '__main__':
-  os.chdir(os.path.dirname(sys.argv[0]))
+  #os.chdir(os.path.dirname(sys.argv[0]))
   unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto b/src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto
new file mode 100644
index 0000000000000000000000000000000000000000..457543aa793afa9d9b5ee2cf9c756e66a040b458
--- /dev/null
+++ b/src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto
@@ -0,0 +1,51 @@
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//     * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+package grpc_protoc_plugin;
+
+enum PayloadType {
+  // Compressable text format.
+  COMPRESSABLE= 0;
+
+  // Uncompressable binary format.
+  UNCOMPRESSABLE = 1;
+
+  // Randomly chosen from all other formats defined in this enum.
+  RANDOM = 2;
+}
+
+message Payload {
+  PayloadType payload_type = 1;
+  oneof payload_body {
+    string payload_compressable = 2;
+    bytes payload_uncompressable = 3;
+  }
+}
diff --git a/src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto b/src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto
new file mode 100644
index 0000000000000000000000000000000000000000..54105df6a56c4bc717a381e896550ea43c974dcd
--- /dev/null
+++ b/src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto
@@ -0,0 +1,77 @@
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//     * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+import "protos/payload/test_payload.proto";
+
+package grpc_protoc_plugin;
+
+message SimpleRequest {
+  // Desired payload type in the response from the server.
+  // If response_type is RANDOM, server randomly chooses one from other formats.
+  PayloadType response_type = 1;
+
+  // Desired payload size in the response from the server.
+  // If response_type is COMPRESSABLE, this denotes the size before compression.
+  int32 response_size = 2;
+
+  // input payload sent along with the request.
+  Payload payload = 3;
+}
+
+message StreamingInputCallRequest {
+  // input payload sent along with the request.
+  Payload payload = 1;
+
+  // Not expecting any payload from the response.
+}
+
+message ResponseParameters {
+  // Desired payload sizes in responses from the server.
+  // If response_type is COMPRESSABLE, this denotes the size before compression.
+  int32 size = 1;
+
+  // Desired interval between consecutive responses in the response stream in
+  // microseconds.
+  int32 interval_us = 2;
+}
+
+message StreamingOutputCallRequest {
+  // Desired payload type in the response from the server.
+  // If response_type is RANDOM, the payload from each response in the stream
+  // might be of different types. This is to simulate a mixed type of payload
+  // stream.
+  PayloadType response_type = 1;
+
+  repeated ResponseParameters response_parameters = 2;
+
+  // input payload sent along with the request.
+  Payload payload = 3;
+}
diff --git a/src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto b/src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto
new file mode 100644
index 0000000000000000000000000000000000000000..734fbda86e9411db250317f0d0256a5bb2b9fe72
--- /dev/null
+++ b/src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto
@@ -0,0 +1,47 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//     * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+import "protos/payload/test_payload.proto";
+
+package grpc_protoc_plugin;
+
+message SimpleResponse {
+  Payload payload = 1;
+}
+
+message StreamingInputCallResponse {
+  // Aggregated size of payloads received from the client.
+  int32 aggregated_payload_size = 1;
+}
+
+message StreamingOutputCallResponse {
+  Payload payload = 1;
+}
diff --git a/src/python/grpcio/tests/protoc_plugin/protoc_plugin_test.proto b/src/python/grpcio/tests/protoc_plugin/protos/service/test_service.proto
similarity index 54%
rename from src/python/grpcio/tests/protoc_plugin/protoc_plugin_test.proto
rename to src/python/grpcio/tests/protoc_plugin/protos/service/test_service.proto
index 6762a8e7f35071b37375712866c387328c7a2d35..fe715ee7f9205d6ec938d1659230e7daea1e919a 100644
--- a/src/python/grpcio/tests/protoc_plugin/protoc_plugin_test.proto
+++ b/src/python/grpcio/tests/protoc_plugin/protos/service/test_service.proto
@@ -1,4 +1,4 @@
-// Copyright 2015, Google Inc.
+// Copyright 2016, Google Inc.
 // All rights reserved.
 //
 // Redistribution and use in source and binary forms, with or without
@@ -27,87 +27,12 @@
 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-// An integration test service that covers all the method signature permutations
-// of unary/streaming requests/responses.
-// This file is duplicated around the code base. See GitHub issue #526.
-syntax = "proto2";
+syntax = "proto3";
 
-package grpc_protoc_plugin;
-
-enum PayloadType {
-  // Compressable text format.
-  COMPRESSABLE= 1;
-
-  // Uncompressable binary format.
-  UNCOMPRESSABLE = 2;
-
-  // Randomly chosen from all other formats defined in this enum.
-  RANDOM = 3;
-}
-
-message Payload {
-  required PayloadType payload_type = 1;
-  oneof payload_body {
-    string payload_compressable = 2;
-    bytes payload_uncompressable = 3;
-  }
-}
-
-message SimpleRequest {
-  // Desired payload type in the response from the server.
-  // If response_type is RANDOM, server randomly chooses one from other formats.
-  optional PayloadType response_type = 1 [default=COMPRESSABLE];
-
-  // Desired payload size in the response from the server.
-  // If response_type is COMPRESSABLE, this denotes the size before compression.
-  optional int32 response_size = 2;
-
-  // Optional input payload sent along with the request.
-  optional Payload payload = 3;
-}
-
-message SimpleResponse {
-  optional Payload payload = 1;
-}
-
-message StreamingInputCallRequest {
-  // Optional input payload sent along with the request.
-  optional Payload payload = 1;
+import "protos/requests/r/test_requests.proto";
+import "protos/responses/test_responses.proto";
 
-  // Not expecting any payload from the response.
-}
-
-message StreamingInputCallResponse {
-  // Aggregated size of payloads received from the client.
-  optional int32 aggregated_payload_size = 1;
-}
-
-message ResponseParameters {
-  // Desired payload sizes in responses from the server.
-  // If response_type is COMPRESSABLE, this denotes the size before compression.
-  required int32 size = 1;
-
-  // Desired interval between consecutive responses in the response stream in
-  // microseconds.
-  required int32 interval_us = 2;
-}
-
-message StreamingOutputCallRequest {
-  // Desired payload type in the response from the server.
-  // If response_type is RANDOM, the payload from each response in the stream
-  // might be of different types. This is to simulate a mixed type of payload
-  // stream.
-  optional PayloadType response_type = 1 [default=COMPRESSABLE];
-
-  repeated ResponseParameters response_parameters = 2;
-
-  // Optional input payload sent along with the request.
-  optional Payload payload = 3;
-}
-
-message StreamingOutputCallResponse {
-  optional Payload payload = 1;
-}
+package grpc_protoc_plugin;
 
 service TestService {
   // One request followed by one response.