diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py
index 3919de14509b3bafef97424f9673670fce5128c5..1981f49fbb87c0b58c95fb008b5650b62d9f8e3d 100644
--- a/test/compiler/python_plugin_test.py
+++ b/test/compiler/python_plugin_test.py
@@ -57,7 +57,6 @@ LONG_DELAY = 1
 
 # Assigned in __main__.
 _build_mode = None
-_port = None
 
 
 class _ServicerMethods(object):
@@ -87,14 +86,14 @@ class _ServicerMethods(object):
     while self._paused:
       time.sleep(0)
 
-  def UnaryCall(self, request, context):
+  def UnaryCall(self, request, unused_context):
     response = self.test_pb2.SimpleResponse()
     response.payload.payload_type = self.test_pb2.COMPRESSABLE
     response.payload.payload_compressable = 'a' * request.response_size
     self._control()
     return response
 
-  def StreamingOutputCall(self, request, context):
+  def StreamingOutputCall(self, request, unused_context):
     for parameter in request.response_parameters:
       response = self.test_pb2.StreamingOutputCallResponse()
       response.payload.payload_type = self.test_pb2.COMPRESSABLE
@@ -102,7 +101,7 @@ class _ServicerMethods(object):
       self._control()
       yield response
 
-  def StreamingInputCall(self, request_iter, context):
+  def StreamingInputCall(self, request_iter, unused_context):
     response = self.test_pb2.StreamingInputCallResponse()
     aggregated_payload_size = 0
     for request in request_iter:
@@ -111,7 +110,7 @@ class _ServicerMethods(object):
     self._control()
     return response
 
-  def FullDuplexCall(self, request_iter, context):
+  def FullDuplexCall(self, request_iter, unused_context):
     for request in request_iter:
       for parameter in request.response_parameters:
         response = self.test_pb2.StreamingOutputCallResponse()
@@ -120,7 +119,7 @@ class _ServicerMethods(object):
         self._control()
         yield response
 
-  def HalfDuplexCall(self, request_iter, context):
+  def HalfDuplexCall(self, request_iter, unused_context):
     responses = []
     for request in request_iter:
       for parameter in request.response_parameters:
@@ -133,6 +132,7 @@ class _ServicerMethods(object):
       yield response
 
 
+@contextlib.contextmanager
 def _CreateService(test_pb2, delay):
   """Provides a servicer backend and a stub.
 
@@ -148,9 +148,11 @@ def _CreateService(test_pb2, delay):
     test_pb2: the test_pb2 module generated by this test
     delay: delay in seconds per response from the servicer
     timeout: how long the stub will wait for the servicer by default.
-  Returns:
-    A two-tuple (servicer, stub), where the servicer is the back-end of the
-      service bound to the stub.
+
+  Yields:
+    A three-tuple (servicer_methods, servicer, stub), where the servicer is
+      the back-end of the service bound to the stub and the server and stub
+      are both activated and ready for use.
   """
   servicer_methods = _ServicerMethods(test_pb2, delay)
 
@@ -172,10 +174,13 @@ def _CreateService(test_pb2, delay):
       return servicer_methods.HalfDuplexCall(request_iter, context)
 
   servicer = Servicer()
-  server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer, _port,
-                                                        None, None)
-  stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)('localhost', _port)
-  return servicer_methods, stub, server
+  server = getattr(
+      test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer, 0, None, None)
+  with server:
+    port = server.port()
+    stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)('localhost', port)
+    with stub:
+      yield servicer_methods, stub, server
 
 
 def StreamingInputRequest(test_pb2):
@@ -255,25 +260,23 @@ class PythonPluginTest(unittest.TestCase):
 
   def testUpDown(self):
     import test_pb2
-    servicer, stub, server = _CreateService(test_pb2, DOES_NOT_MATTER_DELAY)
-    request = test_pb2.SimpleRequest(response_size=13)
-    with server, stub:
-      pass
+    with _CreateService(
+        test_pb2, DOES_NOT_MATTER_DELAY) as (servicer, stub, unused_server):
+      request = test_pb2.SimpleRequest(response_size=13)
 
   def testUnaryCall(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, NO_DELAY)
-    request = test_pb2.SimpleRequest(response_size=13)
-    with server, stub:
+    with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
+      request = test_pb2.SimpleRequest(response_size=13)
       response = stub.UnaryCall(request, NORMAL_TIMEOUT)
     expected_response = servicer.UnaryCall(request, None)
     self.assertEqual(expected_response, response)
 
   def testUnaryCallAsync(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, LONG_DELAY)
     request = test_pb2.SimpleRequest(response_size=13)
-    with server, stub:
+    with _CreateService(test_pb2, LONG_DELAY) as (
+        servicer, stub, unused_server):
       start_time = time.clock()
       response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
       # Check that we didn't block on the asynchronous call.
@@ -285,10 +288,9 @@ class PythonPluginTest(unittest.TestCase):
   def testUnaryCallAsyncExpired(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
     # set the timeout super low...
-    servicer, stub, server = _CreateService(test_pb2,
-                                            delay=DOES_NOT_MATTER_DELAY)
-    request = test_pb2.SimpleRequest(response_size=13)
-    with server, stub:
+    with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
+        servicer, stub, unused_server):
+      request = test_pb2.SimpleRequest(response_size=13)
       with servicer.pause():
         response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT)
         with self.assertRaises(exceptions.ExpirationError):
@@ -296,9 +298,9 @@ class PythonPluginTest(unittest.TestCase):
 
   def testUnaryCallAsyncCancelled(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, DOES_NOT_MATTER_DELAY)
     request = test_pb2.SimpleRequest(response_size=13)
-    with server, stub:
+    with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
+        servicer, stub, unused_server):
       with servicer.pause():
         response_future = stub.UnaryCall.async(request, 1)
         response_future.cancel()
@@ -306,18 +308,17 @@ class PythonPluginTest(unittest.TestCase):
 
   def testUnaryCallAsyncFailed(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, DOES_NOT_MATTER_DELAY)
     request = test_pb2.SimpleRequest(response_size=13)
-    with server, stub:
+    with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
+        servicer, stub, unused_server):
       with servicer.fail():
         response_future = stub.UnaryCall.async(request, NORMAL_TIMEOUT)
         self.assertIsNotNone(response_future.exception())
 
   def testStreamingOutputCall(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, NO_DELAY)
     request = StreamingOutputRequest(test_pb2)
-    with server, stub:
+    with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
       responses = stub.StreamingOutputCall(request, NORMAL_TIMEOUT)
       expected_responses = servicer.StreamingOutputCall(request, None)
       for check in itertools.izip_longest(expected_responses, responses):
@@ -326,9 +327,9 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingOutputCallExpired(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, DOES_NOT_MATTER_DELAY)
     request = StreamingOutputRequest(test_pb2)
-    with server, stub:
+    with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
+        servicer, stub, unused_server):
       with servicer.pause():
         responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
         with self.assertRaises(exceptions.ExpirationError):
@@ -336,10 +337,9 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingOutputCallCancelled(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    unused_servicer, stub, server = _CreateService(test_pb2,
-                                                   DOES_NOT_MATTER_DELAY)
     request = StreamingOutputRequest(test_pb2)
-    with server, stub:
+    with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
+        unused_servicer, stub, unused_server):
       responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
       next(responses)
       responses.cancel()
@@ -350,9 +350,9 @@ class PythonPluginTest(unittest.TestCase):
                  'instead of raising the proper error.')
   def testStreamingOutputCallFailed(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, DOES_NOT_MATTER_DELAY)
     request = StreamingOutputRequest(test_pb2)
-    with server, stub:
+    with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
+        servicer, stub, unused_server):
       with servicer.fail():
         responses = stub.StreamingOutputCall(request, 1)
         self.assertIsNotNone(responses)
@@ -361,8 +361,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingInputCall(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, NO_DELAY)
-    with server, stub:
+    with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
       response = stub.StreamingInputCall(StreamingInputRequest(test_pb2),
                                          NORMAL_TIMEOUT)
     expected_response = servicer.StreamingInputCall(
@@ -371,9 +370,8 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingInputCallAsync(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(
-        test_pb2, LONG_DELAY)
-    with server, stub:
+    with _CreateService(test_pb2, LONG_DELAY) as (
+        servicer, stub, unused_server):
       start_time = time.clock()
       response_future = stub.StreamingInputCall.async(
           StreamingInputRequest(test_pb2), LONG_TIMEOUT)
@@ -386,8 +384,8 @@ class PythonPluginTest(unittest.TestCase):
   def testStreamingInputCallAsyncExpired(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
     # set the timeout super low...
-    servicer, stub, server = _CreateService(test_pb2, DOES_NOT_MATTER_DELAY)
-    with server, stub:
+    with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
+        servicer, stub, unused_server):
       with servicer.pause():
         response_future = stub.StreamingInputCall.async(
             StreamingInputRequest(test_pb2), SHORT_TIMEOUT)
@@ -398,8 +396,8 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingInputCallAsyncCancelled(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, DOES_NOT_MATTER_DELAY)
-    with server, stub:
+    with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
+        servicer, stub, unused_server):
       with servicer.pause():
         response_future = stub.StreamingInputCall.async(
             StreamingInputRequest(test_pb2), NORMAL_TIMEOUT)
@@ -410,8 +408,8 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingInputCallAsyncFailed(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, DOES_NOT_MATTER_DELAY)
-    with server, stub:
+    with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
+        servicer, stub, unused_server):
       with servicer.fail():
         response_future = stub.StreamingInputCall.async(
             StreamingInputRequest(test_pb2), SHORT_TIMEOUT)
@@ -419,8 +417,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testFullDuplexCall(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, NO_DELAY)
-    with server, stub:
+    with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
       responses = stub.FullDuplexCall(FullDuplexRequest(test_pb2),
                                       NORMAL_TIMEOUT)
       expected_responses = servicer.FullDuplexCall(FullDuplexRequest(test_pb2),
@@ -431,9 +428,9 @@ class PythonPluginTest(unittest.TestCase):
 
   def testFullDuplexCallExpired(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, DOES_NOT_MATTER_DELAY)
     request = FullDuplexRequest(test_pb2)
-    with server, stub:
+    with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
+        servicer, stub, unused_server):
       with servicer.pause():
         responses = stub.FullDuplexCall(request, SHORT_TIMEOUT)
         with self.assertRaises(exceptions.ExpirationError):
@@ -441,8 +438,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testFullDuplexCallCancelled(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    unused_servicer, stub, server = _CreateService(test_pb2, NO_DELAY)
-    with server, stub:
+    with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
       request = FullDuplexRequest(test_pb2)
       responses = stub.FullDuplexCall(request, NORMAL_TIMEOUT)
       next(responses)
@@ -454,9 +450,9 @@ class PythonPluginTest(unittest.TestCase):
                  'and fix.')
   def testFullDuplexCallFailed(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, DOES_NOT_MATTER_DELAY)
     request = FullDuplexRequest(test_pb2)
-    with server, stub:
+    with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
+        servicer, stub, unused_server):
       with servicer.fail():
         responses = stub.FullDuplexCall(request, NORMAL_TIMEOUT)
         self.assertIsNotNone(responses)
@@ -465,16 +461,16 @@ class PythonPluginTest(unittest.TestCase):
 
   def testHalfDuplexCall(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    servicer, stub, server = _CreateService(test_pb2, NO_DELAY)
-    def HalfDuplexRequest():
-      request = test_pb2.StreamingOutputCallRequest()
-      request.response_parameters.add(size=1, interval_us=0)
-      yield request
-      request = test_pb2.StreamingOutputCallRequest()
-      request.response_parameters.add(size=2, interval_us=0)
-      request.response_parameters.add(size=3, interval_us=0)
-      yield request
-    with server, stub:
+    with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
+        servicer, stub, unused_server):
+      def HalfDuplexRequest():
+        request = test_pb2.StreamingOutputCallRequest()
+        request.response_parameters.add(size=1, interval_us=0)
+        yield request
+        request = test_pb2.StreamingOutputCallRequest()
+        request.response_parameters.add(size=2, interval_us=0)
+        request.response_parameters.add(size=3, interval_us=0)
+        yield request
       responses = stub.HalfDuplexCall(HalfDuplexRequest(), NORMAL_TIMEOUT)
       expected_responses = servicer.HalfDuplexCall(HalfDuplexRequest(), None)
       for check in itertools.izip_longest(expected_responses, responses):
@@ -483,7 +479,6 @@ class PythonPluginTest(unittest.TestCase):
 
   def testHalfDuplexCallWedged(self):
     import test_pb2  # pylint: disable=g-import-not-at-top
-    _, stub, server = _CreateService(test_pb2, NO_DELAY)
     wait_flag = [False]
     @contextlib.contextmanager
     def wait():  # pylint: disable=invalid-name
@@ -497,7 +492,7 @@ class PythonPluginTest(unittest.TestCase):
       yield request
       while wait_flag[0]:
         time.sleep(0.1)
-    with server, stub:
+    with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
       with wait():
         responses = stub.HalfDuplexCall(HalfDuplexRequest(), NORMAL_TIMEOUT)
         # half-duplex waits for the client to send all info
@@ -516,6 +511,5 @@ if __name__ == '__main__':
   parser.add_argument('--port', dest='port', type=int, default=0)
   args, remainder = parser.parse_known_args()
   _build_mode = args.build_mode
-  _port = args.port
   sys.argv[1:] = remainder
   unittest.main()
diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh
index 19f1458fabbb710c6195d974eeac166efa65e661..06ddb8e41ace0e590cccc96691cefce3be3a7414 100755
--- a/tools/run_tests/run_python.sh
+++ b/tools/run_tests/run_python.sh
@@ -38,7 +38,7 @@ export LD_LIBRARY_PATH=$root/libs/opt
 source python2.7_virtual_environment/bin/activate
 # TODO(issue 215): Properly itemize these in run_tests.py so that they can be parallelized.
 # TODO(atash): Enable dynamic unused port discovery for this test.
-python2.7 -B test/compiler/python_plugin_test.py --build_mode=opt --port=40987
+python2.7 -B test/compiler/python_plugin_test.py --build_mode=opt
 python2.7 -B -m grpc._adapter._blocking_invocation_inline_service_test
 python2.7 -B -m grpc._adapter._c_test
 python2.7 -B -m grpc._adapter._event_invocation_synchronous_event_service_test