From 45c0f2b3051bf1642337e109df57e8031cb654c8 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Wed, 6 Jul 2016 19:26:09 -0700 Subject: [PATCH] Migrated python performance tests to use GA API --- .../tests/qps/benchmark_client.py | 37 +++++++++---------- .../tests/qps/benchmark_server.py | 4 +- .../grpcio_tests/tests/qps/qps_worker.py | 7 +++- .../grpcio_tests/tests/qps/worker_server.py | 36 +++++++++++------- .../grpcio_tests/tests/unit/test_common.py | 22 +++++++++++ .../performance/run_worker_python.sh | 2 +- .../run_tests/performance/scenario_config.py | 25 ++++++------- 7 files changed, 83 insertions(+), 50 deletions(-) diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py index 080281415d470..83b46c914e7f8 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_client.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py @@ -37,16 +37,23 @@ from six.moves import queue import grpc -from grpc.beta import implementations -from grpc.framework.interfaces.face import face from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import services_pb2 from tests.unit import resources -from tests.unit.beta import test_utilities +from tests.unit import test_common _TIMEOUT = 60 * 60 * 24 +class GenericStub(object): + + def __init__(self, channel): + self.UnaryCall = channel.unary_unary( + '/grpc.testing.BenchmarkService/UnaryCall') + self.StreamingCall = channel.stream_stream( + '/grpc.testing.BenchmarkService/StreamingCall') + + class BenchmarkClient: """Benchmark client interface that exposes a non-blocking send_request().""" @@ -54,15 +61,12 @@ class BenchmarkClient: def __init__(self, server, config, hist): # Create the stub - host, port = server.split(':') - port = int(port) if config.HasField('security_params'): - creds = implementations.ssl_channel_credentials( - resources.test_root_certificates()) - channel = test_utilities.not_really_secure_channel( - host, port, creds, config.security_params.server_host_override) + creds = grpc.ssl_channel_credentials(resources.test_root_certificates()) + channel = test_common.test_secure_channel( + server, creds, config.security_params.server_host_override) else: - channel = implementations.insecure_channel(host, port) + channel = grpc.insecure_channel(server) connected_event = threading.Event() def wait_for_ready(connectivity): @@ -73,7 +77,7 @@ def wait_for_ready(connectivity): if config.payload_config.WhichOneof('payload') == 'simple_params': self._generic = False - self._stub = services_pb2.beta_create_BenchmarkService_stub(channel) + self._stub = services_pb2.BenchmarkServiceStub(channel) payload = messages_pb2.Payload( body='\0' * config.payload_config.simple_params.req_size) self._request = messages_pb2.SimpleRequest( @@ -81,7 +85,7 @@ def wait_for_ready(connectivity): response_size=config.payload_config.simple_params.resp_size) else: self._generic = True - self._stub = implementations.generic_stub(channel) + self._stub = GenericStub(channel) self._request = '\0' * config.payload_config.bytebuf_params.req_size self._hist = hist @@ -166,13 +170,8 @@ def send_request(self): def start(self): self._is_streaming = True - if self._generic: - stream_callable = self._stub.stream_stream( - 'grpc.testing.BenchmarkService', 'StreamingCall') - else: - stream_callable = self._stub.StreamingCall - - response_stream = stream_callable(self._request_generator(), _TIMEOUT) + response_stream = self._stub.StreamingCall( + self._request_generator(), _TIMEOUT) for _ in response_stream: self._handle_response( self, time.time() - self._send_time_queue.get_nowait()) diff --git a/src/python/grpcio_tests/tests/qps/benchmark_server.py b/src/python/grpcio_tests/tests/qps/benchmark_server.py index 8cbf480d58bb8..2b76b810cdd86 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_server.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_server.py @@ -31,7 +31,7 @@ from src.proto.grpc.testing import services_pb2 -class BenchmarkServer(services_pb2.BetaBenchmarkServiceServicer): +class BenchmarkServer(services_pb2.BenchmarkServiceServicer): """Synchronous Server implementation for the Benchmark service.""" def UnaryCall(self, request, context): @@ -44,7 +44,7 @@ def StreamingCall(self, request_iterator, context): yield messages_pb2.SimpleResponse(payload=payload) -class GenericBenchmarkServer(services_pb2.BetaBenchmarkServiceServicer): +class GenericBenchmarkServer(services_pb2.BenchmarkServiceServicer): """Generic Server implementation for the Benchmark service.""" def __init__(self, resp_size): diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py index 16926379a5bca..3abf0d08dd9c2 100644 --- a/src/python/grpcio_tests/tests/qps/qps_worker.py +++ b/src/python/grpcio_tests/tests/qps/qps_worker.py @@ -32,18 +32,21 @@ import argparse import time +from concurrent import futures +import grpc from src.proto.grpc.testing import services_pb2 from tests.qps import worker_server def run_worker_server(port): + server = grpc.server((), futures.ThreadPoolExecutor(max_workers=5)) servicer = worker_server.WorkerServer() - server = services_pb2.beta_create_WorkerService_server(servicer) + services_pb2.add_WorkerServiceServicer_to_server(servicer, server) server.add_insecure_port('[::]:{}'.format(port)) server.start() servicer.wait_for_quit() - server.stop(2) + server.stop(0) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index d41f8377c2a11..932a1ffe2b4b7 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -32,8 +32,8 @@ import threading import time -from grpc.beta import implementations -from grpc.framework.interfaces.face import utilities +from concurrent import futures +import grpc from src.proto.grpc.testing import control_pb2 from src.proto.grpc.testing import services_pb2 from src.proto.grpc.testing import stats_pb2 @@ -45,7 +45,7 @@ from tests.unit import resources -class WorkerServer(services_pb2.BetaWorkerServiceServicer): +class WorkerServer(services_pb2.WorkerServiceServicer): """Python Worker Server implementation.""" def __init__(self): @@ -65,7 +65,7 @@ def RunServer(self, request_iterator, context): if request.mark.reset: start_time = end_time yield status - server.stop(0) + server.stop(None) def _get_server_status(self, start_time, end_time, port, cores): end_time = time.time() @@ -76,25 +76,35 @@ def _get_server_status(self, start_time, end_time, port, cores): return control_pb2.ServerStatus(stats=stats, port=port, cores=cores) def _create_server(self, config): - if config.server_type == control_pb2.SYNC_SERVER: + if config.async_server_threads == 0: + # This is the default concurrent.futures thread pool size, but + # None doesn't seem to work + server_threads = multiprocessing.cpu_count() * 5 + else: + server_threads = config.async_server_threads + server = grpc.server((), futures.ThreadPoolExecutor( + max_workers=server_threads)) + if config.server_type == control_pb2.ASYNC_SERVER: servicer = benchmark_server.BenchmarkServer() - server = services_pb2.beta_create_BenchmarkService_server(servicer) + services_pb2.add_BenchmarkServiceServicer_to_server(servicer, server) elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER: resp_size = config.payload_config.bytebuf_params.resp_size servicer = benchmark_server.GenericBenchmarkServer(resp_size) method_implementations = { - ('grpc.testing.BenchmarkService', 'StreamingCall'): - utilities.stream_stream_inline(servicer.StreamingCall), - ('grpc.testing.BenchmarkService', 'UnaryCall'): - utilities.unary_unary_inline(servicer.UnaryCall), + 'StreamingCall': + grpc.stream_stream_rpc_method_handler(servicer.StreamingCall), + 'UnaryCall': + grpc.unary_unary_rpc_method_handler(servicer.UnaryCall), } - server = implementations.server(method_implementations) + handler = grpc.method_handlers_generic_handler( + 'grpc.testing.BenchmarkService', method_implementations) + server.add_generic_rpc_handlers((handler,)) else: raise Exception('Unsupported server type {}'.format(config.server_type)) if config.HasField('security_params'): # Use SSL - server_creds = implementations.ssl_server_credentials([( - resources.private_key(), resources.certificate_chain())]) + server_creds = grpc.ssl_server_credentials( + ((resources.private_key(), resources.certificate_chain()),)) port = server.add_secure_port('[::]:{}'.format(config.port), server_creds) else: port = server.add_insecure_port('[::]:{}'.format(config.port)) diff --git a/src/python/grpcio_tests/tests/unit/test_common.py b/src/python/grpcio_tests/tests/unit/test_common.py index c8886bf4ca567..cd71bd80d7546 100644 --- a/src/python/grpcio_tests/tests/unit/test_common.py +++ b/src/python/grpcio_tests/tests/unit/test_common.py @@ -31,6 +31,7 @@ import collections +import grpc import six INVOCATION_INITIAL_METADATA = (('0', 'abc'), ('1', 'def'), ('2', 'ghi'),) @@ -78,3 +79,24 @@ def metadata_transmitted(original_metadata, transmitted_metadata): return False else: return True + + +def test_secure_channel( + target, channel_credentials, server_host_override): + """Creates an insecure Channel to a remote host. + + Args: + host: The name of the remote host to which to connect. + port: The port of the remote host to which to connect. + channel_credentials: The implementations.ChannelCredentials with which to + connect. + server_host_override: The target name used for SSL host name checking. + + Returns: + An implementations.Channel to the remote host through which RPCs may be + conducted. + """ + channel = grpc.secure_channel( + target, channel_credentials, + (('grpc.ssl_target_name_override', server_host_override,),)) + return channel diff --git a/tools/run_tests/performance/run_worker_python.sh b/tools/run_tests/performance/run_worker_python.sh index 3b8ba6f4e4a32..06cf172d6fece 100755 --- a/tools/run_tests/performance/run_worker_python.sh +++ b/tools/run_tests/performance/run_worker_python.sh @@ -32,4 +32,4 @@ set -ex cd $(dirname $0)/../../.. -PYTHONPATH=src/python/grpcio_tests:src/python/grpcio:src/python/gens py27/bin/python src/python/grpcio_tests/tests/qps/qps_worker.py $@ +PYTHONPATH=src/python/grpcio_tests:src/python/gens py27/bin/python src/python/grpcio_tests/tests/qps/qps_worker.py $@ diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index 2d5130e1e8682..4dfd01fc6636c 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -387,45 +387,44 @@ def worker_port_offset(self): return 500 def scenarios(self): - # TODO(issue #6522): Empty streaming requests does not work for python - #yield _ping_pong_scenario( - # 'python_generic_async_streaming_ping_pong', rpc_type='STREAMING', - # client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - # use_generic_payload=True, - # categories=[SMOKETEST]) + yield _ping_pong_scenario( + 'python_generic_sync_streaming_ping_pong', rpc_type='STREAMING', + client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', + use_generic_payload=True, + categories=[SMOKETEST]) yield _ping_pong_scenario( 'python_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER') + client_type='SYNC_CLIENT', server_type='ASYNC_SERVER') yield _ping_pong_scenario( 'python_protobuf_async_unary_ping_pong', rpc_type='UNARY', - client_type='ASYNC_CLIENT', server_type='SYNC_SERVER') + client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER') yield _ping_pong_scenario( 'python_protobuf_sync_unary_ping_pong', rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + client_type='SYNC_CLIENT', server_type='ASYNC_SERVER', categories=[SMOKETEST]) yield _ping_pong_scenario( 'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + client_type='SYNC_CLIENT', server_type='ASYNC_SERVER', unconstrained_client='sync') yield _ping_pong_scenario( 'python_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + client_type='SYNC_CLIENT', server_type='ASYNC_SERVER', unconstrained_client='sync') yield _ping_pong_scenario( 'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + client_type='SYNC_CLIENT', server_type='ASYNC_SERVER', server_language='c++', server_core_limit=1, async_server_threads=1, categories=[SMOKETEST]) yield _ping_pong_scenario( 'python_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + client_type='SYNC_CLIENT', server_type='ASYNC_SERVER', server_language='c++', server_core_limit=1, async_server_threads=1) def __str__(self):