diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py index b372ea01ade53c26706a366c8cf12bf10d0ad6f5..e2922347f98bfd673b69dfc6011cb60328c2d5a8 100644 --- a/src/python/grpcio/tests/qps/benchmark_client.py +++ b/src/python/grpcio/tests/qps/benchmark_client.py @@ -82,6 +82,7 @@ class BenchmarkClient: self._response_callbacks = [] def add_response_callback(self, callback): + """callback will be invoked as callback(client, query_time)""" self._response_callbacks.append(callback) @abc.abstractmethod @@ -95,10 +96,10 @@ class BenchmarkClient: def stop(self): pass - def _handle_response(self, query_time): + def _handle_response(self, client, query_time): self._hist.add(query_time * 1e9) # Report times in nanoseconds for callback in self._response_callbacks: - callback(query_time) + callback(client, query_time) class UnarySyncBenchmarkClient(BenchmarkClient): @@ -121,7 +122,7 @@ class UnarySyncBenchmarkClient(BenchmarkClient): start_time = time.time() self._stub.UnaryCall(self._request, _TIMEOUT) end_time = time.time() - self._handle_response(end_time - start_time) + self._handle_response(self, end_time - start_time) class UnaryAsyncBenchmarkClient(BenchmarkClient): @@ -136,19 +137,20 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient): def _response_received(self, start_time, resp): resp.result() end_time = time.time() - self._handle_response(end_time - start_time) + self._handle_response(self, end_time - start_time) def stop(self): self._stub = None -class StreamingSyncBenchmarkClient(BenchmarkClient): +class _SyncStream(object): - def __init__(self, server, config, hist): - super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist) + def __init__(self, stub, generic, request, handle_response): + self._stub = stub + self._generic = generic + self._request = request + self._handle_response = handle_response self._is_streaming = False - self._pool = futures.ThreadPoolExecutor(max_workers=1) - # Use a thread-safe queue to put requests on the stream self._request_queue = queue.Queue() self._send_time_queue = queue.Queue() @@ -157,15 +159,6 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): self._request_queue.put(self._request) def start(self): - self._is_streaming = True - self._pool.submit(self._request_stream) - - def stop(self): - self._is_streaming = False - self._pool.shutdown(wait=True) - self._stub = None - - def _request_stream(self): self._is_streaming = True if self._generic: stream_callable = self._stub.stream_stream( @@ -175,8 +168,11 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): response_stream = stream_callable(self._request_generator(), _TIMEOUT) for _ in response_stream: - end_time = time.time() - self._handle_response(end_time - self._send_time_queue.get_nowait()) + self._handle_response( + self, time.time() - self._send_time_queue.get_nowait()) + + def stop(self): + self._is_streaming = False def _request_generator(self): while self._is_streaming: @@ -187,46 +183,28 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): pass -class AsyncReceiver(face.ResponseReceiver): - """Receiver for async stream responses.""" - - def __init__(self, send_time_queue, response_handler): - self._send_time_queue = send_time_queue - self._response_handler = response_handler - - def initial_metadata(self, initial_mdetadata): - pass - - def response(self, response): - end_time = time.time() - self._response_handler(end_time - self._send_time_queue.get_nowait()) - - def complete(self, terminal_metadata, code, details): - pass - - -class StreamingAsyncBenchmarkClient(BenchmarkClient): +class StreamingSyncBenchmarkClient(BenchmarkClient): def __init__(self, server, config, hist): - super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist) - self._send_time_queue = queue.Queue() - self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response) - self._rendezvous = None + super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist) + self._pool = futures.ThreadPoolExecutor( + max_workers=config.outstanding_rpcs_per_channel) + self._streams = [_SyncStream(self._stub, self._generic, + self._request, self._handle_response) + for _ in xrange(config.outstanding_rpcs_per_channel)] + self._curr_stream = 0 def send_request(self): - if self._rendezvous is not None: - self._send_time_queue.put(time.time()) - self._rendezvous.consume(self._request) + # Use a round_robin scheduler to determine what stream to send on + self._streams[self._curr_stream].send_request() + self._curr_stream = (self._curr_stream + 1) % len(self._streams) def start(self): - if self._generic: - stream_callable = self._stub.stream_stream( - 'grpc.testing.BenchmarkService', 'StreamingCall') - else: - stream_callable = self._stub.StreamingCall - self._rendezvous = stream_callable.event( - self._receiver, lambda *args: None, _TIMEOUT) + for stream in self._streams: + self._pool.submit(stream.start) def stop(self): - self._rendezvous.terminate() - self._rendezvous = None + for stream in self._streams: + stream.stop() + self._pool.shutdown(wait=True) + self._stub = None diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py index 1ede7d2af1b67428facb05dce41010d03beade3c..2d1d981733bf0e1a482ea471f826c809d317da57 100644 --- a/src/python/grpcio/tests/qps/client_runner.py +++ b/src/python/grpcio/tests/qps/client_runner.py @@ -98,7 +98,6 @@ class ClosedLoopClientRunner(ClientRunner): self._client.stop() self._client = None - def _send_request(self, response_time): + def _send_request(self, client, response_time): if self._is_running: - self._client.send_request() - + client.send_request() diff --git a/src/python/grpcio/tests/qps/qps_worker.py b/src/python/grpcio/tests/qps/qps_worker.py index 3dda718638e211b0499d6c207b5b4ddb8d55cc0c..16926379a5bca47a5553456a47018af9b784f337 100644 --- a/src/python/grpcio/tests/qps/qps_worker.py +++ b/src/python/grpcio/tests/qps/qps_worker.py @@ -43,9 +43,7 @@ def run_worker_server(port): server.add_insecure_port('[::]:{}'.format(port)) server.start() servicer.wait_for_quit() - # Drain outstanding requests for clean exit - time.sleep(2) - server.stop(0) + server.stop(2) if __name__ == '__main__': diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py index 1f9af5482ccc451fbc456656f02f4502d9293586..d41f8377c2a118202707550587212a74e15e760e 100644 --- a/src/python/grpcio/tests/qps/worker_server.py +++ b/src/python/grpcio/tests/qps/worker_server.py @@ -153,9 +153,8 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer): if config.rpc_type == control_pb2.UNARY: client = benchmark_client.UnaryAsyncBenchmarkClient( server, config, qps_data) - elif config.rpc_type == control_pb2.STREAMING: - client = benchmark_client.StreamingAsyncBenchmarkClient( - server, config, qps_data) + else: + raise Exception('Async streaming client not supported') else: raise Exception('Unsupported client type {}'.format(config.client_type)) diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index b55d728d840fa71ed4ad385617b36989af135d74..2d5130e1e8682f1fa084cf5a1918ffbbfd97681e 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -395,8 +395,8 @@ class PythonLanguage: # categories=[SMOKETEST]) yield _ping_pong_scenario( - 'python_protobuf_async_streaming_ping_pong', rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='SYNC_SERVER') + 'python_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING', + client_type='SYNC_CLIENT', server_type='SYNC_SERVER') yield _ping_pong_scenario( 'python_protobuf_async_unary_ping_pong', rpc_type='UNARY', @@ -413,9 +413,9 @@ class PythonLanguage: unconstrained_client='sync') yield _ping_pong_scenario( - 'python_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='SYNC_SERVER', - unconstrained_client='async') + 'python_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING', + client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + unconstrained_client='sync') yield _ping_pong_scenario( 'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',