Skip to content

Commit

Permalink
Merge pull request grpc#7260 from kpayson64/ga_performance_tests
Browse files Browse the repository at this point in the history
Migrated python performance tests to use GA API
  • Loading branch information
kpayson64 authored Jul 8, 2016
2 parents f5d5ad8 + 45c0f2b commit 4e19d66
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 50 deletions.
37 changes: 18 additions & 19 deletions src/python/grpcio_tests/tests/qps/benchmark_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,36 @@
from six.moves import queue

import grpc
from grpc.beta import implementations
from grpc.framework.interfaces.face import face
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import services_pb2
from tests.unit import resources
from tests.unit.beta import test_utilities
from tests.unit import test_common

_TIMEOUT = 60 * 60 * 24


class GenericStub(object):

def __init__(self, channel):
self.UnaryCall = channel.unary_unary(
'/grpc.testing.BenchmarkService/UnaryCall')
self.StreamingCall = channel.stream_stream(
'/grpc.testing.BenchmarkService/StreamingCall')


class BenchmarkClient:
"""Benchmark client interface that exposes a non-blocking send_request()."""

__metaclass__ = abc.ABCMeta

def __init__(self, server, config, hist):
# Create the stub
host, port = server.split(':')
port = int(port)
if config.HasField('security_params'):
creds = implementations.ssl_channel_credentials(
resources.test_root_certificates())
channel = test_utilities.not_really_secure_channel(
host, port, creds, config.security_params.server_host_override)
creds = grpc.ssl_channel_credentials(resources.test_root_certificates())
channel = test_common.test_secure_channel(
server, creds, config.security_params.server_host_override)
else:
channel = implementations.insecure_channel(host, port)
channel = grpc.insecure_channel(server)

connected_event = threading.Event()
def wait_for_ready(connectivity):
Expand All @@ -73,15 +77,15 @@ def wait_for_ready(connectivity):

if config.payload_config.WhichOneof('payload') == 'simple_params':
self._generic = False
self._stub = services_pb2.beta_create_BenchmarkService_stub(channel)
self._stub = services_pb2.BenchmarkServiceStub(channel)
payload = messages_pb2.Payload(
body='\0' * config.payload_config.simple_params.req_size)
self._request = messages_pb2.SimpleRequest(
payload=payload,
response_size=config.payload_config.simple_params.resp_size)
else:
self._generic = True
self._stub = implementations.generic_stub(channel)
self._stub = GenericStub(channel)
self._request = '\0' * config.payload_config.bytebuf_params.req_size

self._hist = hist
Expand Down Expand Up @@ -166,13 +170,8 @@ def send_request(self):

def start(self):
self._is_streaming = True
if self._generic:
stream_callable = self._stub.stream_stream(
'grpc.testing.BenchmarkService', 'StreamingCall')
else:
stream_callable = self._stub.StreamingCall

response_stream = stream_callable(self._request_generator(), _TIMEOUT)
response_stream = self._stub.StreamingCall(
self._request_generator(), _TIMEOUT)
for _ in response_stream:
self._handle_response(
self, time.time() - self._send_time_queue.get_nowait())
Expand Down
4 changes: 2 additions & 2 deletions src/python/grpcio_tests/tests/qps/benchmark_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from src.proto.grpc.testing import services_pb2


class BenchmarkServer(services_pb2.BetaBenchmarkServiceServicer):
class BenchmarkServer(services_pb2.BenchmarkServiceServicer):
"""Synchronous Server implementation for the Benchmark service."""

def UnaryCall(self, request, context):
Expand All @@ -44,7 +44,7 @@ def StreamingCall(self, request_iterator, context):
yield messages_pb2.SimpleResponse(payload=payload)


class GenericBenchmarkServer(services_pb2.BetaBenchmarkServiceServicer):
class GenericBenchmarkServer(services_pb2.BenchmarkServiceServicer):
"""Generic Server implementation for the Benchmark service."""

def __init__(self, resp_size):
Expand Down
7 changes: 5 additions & 2 deletions src/python/grpcio_tests/tests/qps/qps_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@
import argparse
import time

from concurrent import futures
import grpc
from src.proto.grpc.testing import services_pb2

from tests.qps import worker_server


def run_worker_server(port):
server = grpc.server((), futures.ThreadPoolExecutor(max_workers=5))
servicer = worker_server.WorkerServer()
server = services_pb2.beta_create_WorkerService_server(servicer)
services_pb2.add_WorkerServiceServicer_to_server(servicer, server)
server.add_insecure_port('[::]:{}'.format(port))
server.start()
servicer.wait_for_quit()
server.stop(2)
server.stop(0)


if __name__ == '__main__':
Expand Down
36 changes: 23 additions & 13 deletions src/python/grpcio_tests/tests/qps/worker_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import threading
import time

from grpc.beta import implementations
from grpc.framework.interfaces.face import utilities
from concurrent import futures
import grpc
from src.proto.grpc.testing import control_pb2
from src.proto.grpc.testing import services_pb2
from src.proto.grpc.testing import stats_pb2
Expand All @@ -45,7 +45,7 @@
from tests.unit import resources


class WorkerServer(services_pb2.BetaWorkerServiceServicer):
class WorkerServer(services_pb2.WorkerServiceServicer):
"""Python Worker Server implementation."""

def __init__(self):
Expand All @@ -65,7 +65,7 @@ def RunServer(self, request_iterator, context):
if request.mark.reset:
start_time = end_time
yield status
server.stop(0)
server.stop(None)

def _get_server_status(self, start_time, end_time, port, cores):
end_time = time.time()
Expand All @@ -76,25 +76,35 @@ def _get_server_status(self, start_time, end_time, port, cores):
return control_pb2.ServerStatus(stats=stats, port=port, cores=cores)

def _create_server(self, config):
if config.server_type == control_pb2.SYNC_SERVER:
if config.async_server_threads == 0:
# This is the default concurrent.futures thread pool size, but
# None doesn't seem to work
server_threads = multiprocessing.cpu_count() * 5
else:
server_threads = config.async_server_threads
server = grpc.server((), futures.ThreadPoolExecutor(
max_workers=server_threads))
if config.server_type == control_pb2.ASYNC_SERVER:
servicer = benchmark_server.BenchmarkServer()
server = services_pb2.beta_create_BenchmarkService_server(servicer)
services_pb2.add_BenchmarkServiceServicer_to_server(servicer, server)
elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER:
resp_size = config.payload_config.bytebuf_params.resp_size
servicer = benchmark_server.GenericBenchmarkServer(resp_size)
method_implementations = {
('grpc.testing.BenchmarkService', 'StreamingCall'):
utilities.stream_stream_inline(servicer.StreamingCall),
('grpc.testing.BenchmarkService', 'UnaryCall'):
utilities.unary_unary_inline(servicer.UnaryCall),
'StreamingCall':
grpc.stream_stream_rpc_method_handler(servicer.StreamingCall),
'UnaryCall':
grpc.unary_unary_rpc_method_handler(servicer.UnaryCall),
}
server = implementations.server(method_implementations)
handler = grpc.method_handlers_generic_handler(
'grpc.testing.BenchmarkService', method_implementations)
server.add_generic_rpc_handlers((handler,))
else:
raise Exception('Unsupported server type {}'.format(config.server_type))

if config.HasField('security_params'): # Use SSL
server_creds = implementations.ssl_server_credentials([(
resources.private_key(), resources.certificate_chain())])
server_creds = grpc.ssl_server_credentials(
((resources.private_key(), resources.certificate_chain()),))
port = server.add_secure_port('[::]:{}'.format(config.port), server_creds)
else:
port = server.add_insecure_port('[::]:{}'.format(config.port))
Expand Down
22 changes: 22 additions & 0 deletions src/python/grpcio_tests/tests/unit/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import collections

import grpc
import six

INVOCATION_INITIAL_METADATA = (('0', 'abc'), ('1', 'def'), ('2', 'ghi'),)
Expand Down Expand Up @@ -78,3 +79,24 @@ def metadata_transmitted(original_metadata, transmitted_metadata):
return False
else:
return True


def test_secure_channel(
target, channel_credentials, server_host_override):
"""Creates an insecure Channel to a remote host.
Args:
host: The name of the remote host to which to connect.
port: The port of the remote host to which to connect.
channel_credentials: The implementations.ChannelCredentials with which to
connect.
server_host_override: The target name used for SSL host name checking.
Returns:
An implementations.Channel to the remote host through which RPCs may be
conducted.
"""
channel = grpc.secure_channel(
target, channel_credentials,
(('grpc.ssl_target_name_override', server_host_override,),))
return channel
2 changes: 1 addition & 1 deletion tools/run_tests/performance/run_worker_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ set -ex

cd $(dirname $0)/../../..

PYTHONPATH=src/python/grpcio_tests:src/python/grpcio:src/python/gens py27/bin/python src/python/grpcio_tests/tests/qps/qps_worker.py $@
PYTHONPATH=src/python/grpcio_tests:src/python/gens py27/bin/python src/python/grpcio_tests/tests/qps/qps_worker.py $@
25 changes: 12 additions & 13 deletions tools/run_tests/performance/scenario_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,45 +387,44 @@ def worker_port_offset(self):
return 500

def scenarios(self):
# TODO(issue #6522): Empty streaming requests does not work for python
#yield _ping_pong_scenario(
# 'python_generic_async_streaming_ping_pong', rpc_type='STREAMING',
# client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
# use_generic_payload=True,
# categories=[SMOKETEST])
yield _ping_pong_scenario(
'python_generic_sync_streaming_ping_pong', rpc_type='STREAMING',
client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
use_generic_payload=True,
categories=[SMOKETEST])

yield _ping_pong_scenario(
'python_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER')
client_type='SYNC_CLIENT', server_type='ASYNC_SERVER')

yield _ping_pong_scenario(
'python_protobuf_async_unary_ping_pong', rpc_type='UNARY',
client_type='ASYNC_CLIENT', server_type='SYNC_SERVER')
client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER')

yield _ping_pong_scenario(
'python_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
categories=[SMOKETEST])

yield _ping_pong_scenario(
'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
unconstrained_client='sync')

yield _ping_pong_scenario(
'python_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
unconstrained_client='sync')

yield _ping_pong_scenario(
'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
server_language='c++', server_core_limit=1, async_server_threads=1,
categories=[SMOKETEST])

yield _ping_pong_scenario(
'python_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
server_language='c++', server_core_limit=1, async_server_threads=1)

def __str__(self):
Expand Down

0 comments on commit 4e19d66

Please sign in to comment.