Skip to content

Commit

Permalink
A Cython-layer read-not-all-responses test
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielmanistaatgoogle committed Jun 3, 2016
1 parent 84f7193 commit debd151
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/python/grpcio/tests/tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"_low_test.HangingServerShutdown",
"_low_test.InsecureServerInsecureClient",
"_not_found_test.NotFoundTest",
"_read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest",
"_rpc_test.RPCTest",
"_sanity_test.Sanity",
"_secure_interop_test.SecureInteropTest",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""Test a corner-case at the level of the Cython API."""

import threading
import unittest

from grpc._cython import cygrpc

_INFINITE_FUTURE = cygrpc.Timespec(float('+inf'))
_EMPTY_FLAGS = 0
_EMPTY_METADATA = cygrpc.Metadata(())


class _ServerDriver(object):

def __init__(self, completion_queue, shutdown_tag):
self._condition = threading.Condition()
self._completion_queue = completion_queue
self._shutdown_tag = shutdown_tag
self._events = []
self._saw_shutdown_tag = False

def start(self):
def in_thread():
while True:
event = self._completion_queue.poll()
with self._condition:
self._events.append(event)
self._condition.notify()
if event.tag is self._shutdown_tag:
self._saw_shutdown_tag = True
break
thread = threading.Thread(target=in_thread)
thread.start()

def done(self):
with self._condition:
return self._saw_shutdown_tag

def first_event(self):
with self._condition:
while not self._events:
self._condition.wait()
return self._events[0]

def events(self):
with self._condition:
while not self._saw_shutdown_tag:
self._condition.wait()
return tuple(self._events)


class _QueueDriver(object):

def __init__(self, condition, completion_queue, due):
self._condition = condition
self._completion_queue = completion_queue
self._due = due
self._events = []
self._returned = False

def start(self):
def in_thread():
while True:
event = self._completion_queue.poll()
with self._condition:
self._events.append(event)
self._due.remove(event.tag)
self._condition.notify_all()
if not self._due:
self._returned = True
return
thread = threading.Thread(target=in_thread)
thread.start()

def done(self):
with self._condition:
return self._returned

def event_with_tag(self, tag):
with self._condition:
while True:
for event in self._events:
if event.tag is tag:
return event
self._condition.wait()

def events(self):
with self._condition:
while not self._returned:
self._condition.wait()
return tuple(self._events)


class ReadSomeButNotAllResponsesTest(unittest.TestCase):

def testReadSomeButNotAllResponses(self):
server_completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server()
server.register_completion_queue(server_completion_queue)
port = server.add_http2_port('[::]:0')
server.start()
channel = cygrpc.Channel('localhost:{}'.format(port))

server_shutdown_tag = 'server_shutdown_tag'
server_driver = _ServerDriver(server_completion_queue, server_shutdown_tag)
server_driver.start()

client_condition = threading.Condition()
client_due = set()
client_completion_queue = cygrpc.CompletionQueue()
client_driver = _QueueDriver(
client_condition, client_completion_queue, client_due)
client_driver.start()

server_call_condition = threading.Condition()
server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
server_send_first_message_tag = 'server_send_first_message_tag'
server_send_second_message_tag = 'server_send_second_message_tag'
server_complete_rpc_tag = 'server_complete_rpc_tag'
server_call_due = set((
server_send_initial_metadata_tag,
server_send_first_message_tag,
server_send_second_message_tag,
server_complete_rpc_tag,
))
server_call_completion_queue = cygrpc.CompletionQueue()
server_call_driver = _QueueDriver(
server_call_condition, server_call_completion_queue, server_call_due)
server_call_driver.start()

server_rpc_tag = 'server_rpc_tag'
request_call_result = server.request_call(
server_call_completion_queue, server_completion_queue, server_rpc_tag)

client_call = channel.create_call(
None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', None,
_INFINITE_FUTURE)
client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
client_complete_rpc_tag = 'client_complete_rpc_tag'
with client_condition:
client_receive_initial_metadata_start_batch_result = (
client_call.start_batch(cygrpc.Operations([
cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
]), client_receive_initial_metadata_tag))
client_due.add(client_receive_initial_metadata_tag)
client_complete_rpc_start_batch_result = (
client_call.start_batch(cygrpc.Operations([
cygrpc.operation_send_initial_metadata(
_EMPTY_METADATA, _EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
]), client_complete_rpc_tag))
client_due.add(client_complete_rpc_tag)

server_rpc_event = server_driver.first_event()

with server_call_condition:
server_send_initial_metadata_start_batch_result = (
server_rpc_event.operation_call.start_batch(cygrpc.Operations([
cygrpc.operation_send_initial_metadata(
_EMPTY_METADATA, _EMPTY_FLAGS),
]), server_send_initial_metadata_tag))
server_send_first_message_start_batch_result = (
server_rpc_event.operation_call.start_batch(cygrpc.Operations([
cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
]), server_send_first_message_tag))
server_send_initial_metadata_event = server_call_driver.event_with_tag(
server_send_initial_metadata_tag)
server_send_first_message_event = server_call_driver.event_with_tag(
server_send_first_message_tag)
with server_call_condition:
server_send_second_message_start_batch_result = (
server_rpc_event.operation_call.start_batch(cygrpc.Operations([
cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
]), server_send_second_message_tag))
server_complete_rpc_start_batch_result = (
server_rpc_event.operation_call.start_batch(cygrpc.Operations([
cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
cygrpc.Metadata(()), cygrpc.StatusCode.ok, b'test details',
_EMPTY_FLAGS),
]), server_complete_rpc_tag))
server_send_second_message_event = server_call_driver.event_with_tag(
server_send_second_message_tag)
server_complete_rpc_event = server_call_driver.event_with_tag(
server_complete_rpc_tag)
server_call_driver.events()

with client_condition:
client_receive_first_message_tag = 'client_receive_first_message_tag'
client_receive_first_message_start_batch_result = (
client_call.start_batch(cygrpc.Operations([
cygrpc.operation_receive_message(_EMPTY_FLAGS),
]), client_receive_first_message_tag))
client_due.add(client_receive_first_message_tag)
client_receive_first_message_event = client_driver.event_with_tag(
client_receive_first_message_tag)

client_call_cancel_result = client_call.cancel()
client_driver.events()

server.shutdown(server_completion_queue, server_shutdown_tag)
server.cancel_all_calls()
server_driver.events()

self.assertEqual(cygrpc.CallError.ok, request_call_result)
self.assertEqual(
cygrpc.CallError.ok, server_send_initial_metadata_start_batch_result)
self.assertEqual(
cygrpc.CallError.ok, client_receive_initial_metadata_start_batch_result)
self.assertEqual(
cygrpc.CallError.ok, client_complete_rpc_start_batch_result)
self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result)
self.assertIs(server_rpc_tag, server_rpc_event.tag)
self.assertEqual(
cygrpc.CompletionType.operation_complete, server_rpc_event.type)
self.assertIsInstance(server_rpc_event.operation_call, cygrpc.Call)
self.assertEqual(0, len(server_rpc_event.batch_operations))


if __name__ == '__main__':
unittest.main(verbosity=2)

0 comments on commit debd151

Please sign in to comment.