diff --git a/src/python/grpcio/tests/_result.py b/src/python/grpcio/tests/_result.py
index 065153f0c305bd769763b9359c9821881c407522..18b0f4396369e28350c7ae59f14cd92fa91982f5 100644
--- a/src/python/grpcio/tests/_result.py
+++ b/src/python/grpcio/tests/_result.py
@@ -204,7 +204,7 @@ class AugmentedResult(unittest.TestResult):
     """
     case_id = self.id_map(test)
     self.cases[case_id] = self.cases[case_id].updated(
-        stdout=stdout, stderr=stderr)
+        stdout=stdout.decode(), stderr=stderr.decode())
 
   def augmented_results(self, filter):
     """Convenience method to retrieve filtered case results.
diff --git a/src/python/grpcio/tests/_runner.py b/src/python/grpcio/tests/_runner.py
index e899154b0bc48e7967a0e64775feedb08cc07048..173a170409b9512a94cf174b9bf5691083677b65 100644
--- a/src/python/grpcio/tests/_runner.py
+++ b/src/python/grpcio/tests/_runner.py
@@ -42,6 +42,7 @@ import time
 import unittest
 import uuid
 
+import six
 from six import moves
 
 from tests import _loader
@@ -95,6 +96,8 @@ class CaptureFile(object):
     Arguments:
       value (str): What to write to the original file.
     """
+    if six.PY3 and not isinstance(value, six.binary_type):
+      value = bytes(value, 'ascii')
     if self._saved_fd is None:
       os.write(self._redirect_fd, value)
     else:
@@ -171,9 +174,9 @@ class Runner(object):
         result.stopTestRun()
         stdout_pipe.write_bypass(result_out.getvalue())
         stdout_pipe.write_bypass(
-            '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output()))
+            '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output().decode()))
         stderr_pipe.write_bypass(
-            '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output()))
+            '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output().decode()))
         os._exit(1)
     signal.signal(signal.SIGINT, sigint_handler)
     signal.signal(signal.SIGSEGV, fault_handler)
@@ -216,7 +219,7 @@ class Runner(object):
     sys.stdout.write(result_out.getvalue())
     sys.stdout.flush()
     signal.signal(signal.SIGINT, signal.SIG_DFL)
-    with open('report.xml', 'w') as report_xml_file:
+    with open('report.xml', 'wb') as report_xml_file:
       _result.jenkins_junit_xml(result).write(report_xml_file)
     return result
 
diff --git a/src/python/grpcio/tests/interop/methods.py b/src/python/grpcio/tests/interop/methods.py
index 1f5561c1f01fcd02a310e85d113941a933976eaa..7f42b4a005e87f0a4b7263897f5dc1b6b1fe76c5 100644
--- a/src/python/grpcio/tests/interop/methods.py
+++ b/src/python/grpcio/tests/interop/methods.py
@@ -29,6 +29,8 @@
 
 """Implementations of interoperability test methods."""
 
+from __future__ import print_function
+
 import enum
 import json
 import os
@@ -208,7 +210,7 @@ def _ping_pong(stub):
 
   with stub, _Pipe() as pipe:
     response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
-    print 'Starting ping-pong with response iterator %s' % response_iterator
+    print('Starting ping-pong with response iterator %s' % response_iterator)
     for response_size, payload_size in zip(
         request_response_sizes, request_payload_sizes):
       request = messages_pb2.StreamingOutputCallRequest(
diff --git a/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
index ba5b219a88ef9dd5349d4716370604a067f90cae..230ec6487d2c58c883c7b85cd0780088a9c77718 100644
--- a/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
+++ b/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
 # All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or without
@@ -42,6 +42,8 @@ import threading
 import time
 import unittest
 
+from six import moves
+
 from grpc.beta import implementations
 from grpc.framework.foundation import future
 from grpc.framework.interfaces.face import face
@@ -250,7 +252,7 @@ class PythonPluginTest(unittest.TestCase):
   def testImportAttributes(self):
     # check that we can access the generated module and its members.
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None))
     self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None))
     self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None))
@@ -258,13 +260,13 @@ class PythonPluginTest(unittest.TestCase):
 
   def testUpDown(self):
     import protoc_plugin_test_pb2 as test_pb2
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     with _CreateService(test_pb2) as (servicer, stub):
       request = test_pb2.SimpleRequest(response_size=13)
 
   def testUnaryCall(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       request = test_pb2.SimpleRequest(response_size=13)
       response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
@@ -273,7 +275,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testUnaryCallFuture(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     request = test_pb2.SimpleRequest(response_size=13)
     with _CreateService(test_pb2) as (methods, stub):
       # Check that the call does not block waiting for the server to respond.
@@ -286,7 +288,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testUnaryCallFutureExpired(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       request = test_pb2.SimpleRequest(response_size=13)
       with methods.pause():
@@ -297,7 +299,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testUnaryCallFutureCancelled(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     request = test_pb2.SimpleRequest(response_size=13)
     with _CreateService(test_pb2) as (methods, stub):
       with methods.pause():
@@ -307,7 +309,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testUnaryCallFutureFailed(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     request = test_pb2.SimpleRequest(response_size=13)
     with _CreateService(test_pb2) as (methods, stub):
       with methods.fail():
@@ -317,20 +319,20 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingOutputCall(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     request = _streaming_output_request(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       responses = stub.StreamingOutputCall(
           request, test_constants.LONG_TIMEOUT)
       expected_responses = methods.StreamingOutputCall(
           request, 'not a real RpcContext!')
-      for expected_response, response in itertools.izip_longest(
+      for expected_response, response in moves.zip_longest(
           expected_responses, responses):
         self.assertEqual(expected_response, response)
 
   def testStreamingOutputCallExpired(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     request = _streaming_output_request(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       with methods.pause():
@@ -341,7 +343,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingOutputCallCancelled(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     request = _streaming_output_request(test_pb2)
     with _CreateService(test_pb2) as (unused_methods, stub):
       responses = stub.StreamingOutputCall(
@@ -353,7 +355,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingOutputCallFailed(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     request = _streaming_output_request(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       with methods.fail():
@@ -364,7 +366,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingInputCall(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       response = stub.StreamingInputCall(
           _streaming_input_request_iterator(test_pb2),
@@ -375,7 +377,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingInputCallFuture(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       with methods.pause():
         response_future = stub.StreamingInputCall.future(
@@ -388,7 +390,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingInputCallFutureExpired(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       with methods.pause():
         response_future = stub.StreamingInputCall.future(
@@ -401,7 +403,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingInputCallFutureCancelled(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       with methods.pause():
         response_future = stub.StreamingInputCall.future(
@@ -414,7 +416,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testStreamingInputCallFutureFailed(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       with methods.fail():
         response_future = stub.StreamingInputCall.future(
@@ -424,19 +426,19 @@ class PythonPluginTest(unittest.TestCase):
 
   def testFullDuplexCall(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       responses = stub.FullDuplexCall(
           _full_duplex_request_iterator(test_pb2), test_constants.LONG_TIMEOUT)
       expected_responses = methods.FullDuplexCall(
           _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
-      for expected_response, response in itertools.izip_longest(
+      for expected_response, response in moves.zip_longest(
           expected_responses, responses):
         self.assertEqual(expected_response, response)
 
   def testFullDuplexCallExpired(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     request_iterator = _full_duplex_request_iterator(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       with methods.pause():
@@ -447,7 +449,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testFullDuplexCallCancelled(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       request_iterator = _full_duplex_request_iterator(test_pb2)
       responses = stub.FullDuplexCall(
@@ -459,7 +461,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testFullDuplexCallFailed(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     request_iterator = _full_duplex_request_iterator(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       with methods.fail():
@@ -471,7 +473,7 @@ class PythonPluginTest(unittest.TestCase):
 
   def testHalfDuplexCall(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     with _CreateService(test_pb2) as (methods, stub):
       def half_duplex_request_iterator():
         request = test_pb2.StreamingOutputCallRequest()
@@ -485,13 +487,13 @@ class PythonPluginTest(unittest.TestCase):
           half_duplex_request_iterator(), test_constants.LONG_TIMEOUT)
       expected_responses = methods.HalfDuplexCall(
           half_duplex_request_iterator(), 'not a real RpcContext!')
-      for check in itertools.izip_longest(expected_responses, responses):
+      for check in moves.zip_longest(expected_responses, responses):
         expected_response, response = check
         self.assertEqual(expected_response, response)
 
   def testHalfDuplexCallWedged(self):
     import protoc_plugin_test_pb2 as test_pb2  # pylint: disable=g-import-not-at-top
-    reload(test_pb2)
+    moves.reload_module(test_pb2)
     condition = threading.Condition()
     wait_cell = [False]
     @contextlib.contextmanager
diff --git a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py b/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py
index a6fd82388c014621c50e68749d68c224aff93c94..06bfc34977cdf09430a54defe663a02f0d37269a 100644
--- a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py
+++ b/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
 # All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or without
@@ -29,11 +29,13 @@
 
 """Tests for the old '_low'."""
 
-import Queue
 import threading
 import time
 import unittest
 
+import six
+from six.moves import queue
+
 from grpc._adapter import _intermediary_low as _low
 
 _STREAM_LENGTH = 300
@@ -67,7 +69,7 @@ class LonelyClientTest(unittest.TestCase):
     second_event = completion_queue.get(after_deadline)
     self.assertIsNotNone(second_event)
     kinds = [event.kind for event in (first_event, second_event)]
-    self.assertItemsEqual(
+    six.assertCountEqual(self,
         (_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH),
         kinds)
 
@@ -99,7 +101,7 @@ class EchoTest(unittest.TestCase):
     self.server = _low.Server(self.server_completion_queue)
     port = self.server.add_http2_addr('[::]:0')
     self.server.start()
-    self.server_events = Queue.Queue()
+    self.server_events = queue.Queue()
     self.server_completion_queue_thread = threading.Thread(
         target=_drive_completion_queue,
         args=(self.server_completion_queue, self.server_events))
@@ -107,7 +109,7 @@ class EchoTest(unittest.TestCase):
 
     self.client_completion_queue = _low.CompletionQueue()
     self.channel = _low.Channel('%s:%d' % (self.host, port), None)
-    self.client_events = Queue.Queue()
+    self.client_events = queue.Queue()
     self.client_completion_queue_thread = threading.Thread(
         target=_drive_completion_queue,
         args=(self.client_completion_queue, self.client_events))
@@ -315,7 +317,7 @@ class CancellationTest(unittest.TestCase):
     self.server = _low.Server(self.server_completion_queue)
     port = self.server.add_http2_addr('[::]:0')
     self.server.start()
-    self.server_events = Queue.Queue()
+    self.server_events = queue.Queue()
     self.server_completion_queue_thread = threading.Thread(
         target=_drive_completion_queue,
         args=(self.server_completion_queue, self.server_events))
@@ -323,7 +325,7 @@ class CancellationTest(unittest.TestCase):
 
     self.client_completion_queue = _low.CompletionQueue()
     self.channel = _low.Channel('%s:%d' % (self.host, port), None)
-    self.client_events = Queue.Queue()
+    self.client_events = queue.Queue()
     self.client_completion_queue_thread = threading.Thread(
         target=_drive_completion_queue,
         args=(self.client_completion_queue, self.client_events))
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/base/_control.py b/src/python/grpcio/tests/unit/framework/interfaces/base/_control.py
index fe69e63995bb40e904fd18f1027c2c1346489a58..94bcc1428e2649578e0be8e37f6825cecb6e534c 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/base/_control.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/base/_control.py
@@ -29,6 +29,8 @@
 
 """Part of the tests of the base interface of RPC Framework."""
 
+from __future__ import division
+
 import abc
 import collections
 import enum
@@ -47,8 +49,8 @@ from tests.unit.framework.interfaces.base import test_interfaces  # pylint: disa
 _GROUP = 'base test cases test group'
 _METHOD = 'base test cases test method'
 
-_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE = test_constants.PAYLOAD_SIZE / 20
-_MINIMUM_PAYLOAD_SIZE = test_constants.PAYLOAD_SIZE / 600
+_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE = test_constants.PAYLOAD_SIZE // 20
+_MINIMUM_PAYLOAD_SIZE = test_constants.PAYLOAD_SIZE // 600
 
 
 def _create_payload(randomness):
@@ -59,7 +61,7 @@ def _create_payload(randomness):
   random_section = bytes(
       bytearray(
           randomness.getrandbits(8) for _ in range(random_section_length)))
-  sevens_section = '\x07' * (length - random_section_length)
+  sevens_section = b'\x07' * (length - random_section_length)
   return b''.join(randomness.sample((random_section, sevens_section), 2))
 
 
@@ -385,13 +387,13 @@ class _SequenceController(Controller):
     return request + request
 
   def deserialize_request(self, serialized_request):
-    return serialized_request[:len(serialized_request) / 2]
+    return serialized_request[:len(serialized_request) // 2]
 
   def serialize_response(self, response):
     return response * 3
 
   def deserialize_response(self, serialized_response):
-    return serialized_response[2 * len(serialized_response) / 3:]
+    return serialized_response[2 * len(serialized_response) // 3:]
 
   def invocation(self):
     with self._condition:
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
index 530ba4ff0f8c539211083e9514102c16840eece5..936b87f59761a8d1dabf7967c7a3cacb1b1c1b4a 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -29,6 +29,8 @@
 
 """Test code for the Face layer of RPC Framework."""
 
+from __future__ import division
+
 import abc
 import itertools
 import unittest
@@ -182,7 +184,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
 
         some_completed_response_futures_iterator = itertools.islice(
             futures.as_completed(response_futures_to_indices),
-            test_constants.PARALLELISM / 2)
+            test_constants.PARALLELISM // 2)
         for response_future in some_completed_response_futures_iterator:
           index = response_futures_to_indices[response_future]
           test_messages.verify(requests[index], response_future.result(), self)
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
index 89f344c289848897e7cc0f36b50c81f3fcdebd29..401b52f6143bae0a70a704adf2191d272a043ddd 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
@@ -29,6 +29,8 @@
 
 """Test code for the Face layer of RPC Framework."""
 
+from __future__ import division
+
 import abc
 import contextlib
 import itertools
@@ -277,7 +279,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
 
         some_completed_response_futures_iterator = itertools.islice(
             futures.as_completed(response_futures_to_indices),
-            test_constants.PARALLELISM / 2)
+            test_constants.PARALLELISM // 2)
         for response_future in some_completed_response_futures_iterator:
           index = response_futures_to_indices[response_future]
           test_messages.verify(requests[index], response_future.result(), self)