Skip to content

Commit

Permalink
Merge pull request grpc#5786 from soltanmm/this-is-not-a-pipe
Browse files Browse the repository at this point in the history
Don't use a pipe for output capturing in Python's test runner
  • Loading branch information
jtattermusch committed Mar 18, 2016
2 parents d848db1 + fd5a3ef commit 220986e
Showing 1 changed file with 34 additions and 57 deletions.
91 changes: 34 additions & 57 deletions src/python/grpcio/tests/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import select
import signal
import sys
import tempfile
import threading
import time
import unittest
Expand All @@ -43,72 +44,47 @@
from tests import _loader
from tests import _result

# This number needs to be large enough to outpace output on stdout and stderr
# from the gRPC core, otherwise we could end up in a potential deadlock. This
# stems from the OS waiting on someone to clear a filled pipe buffer while the
# GIL is held from a write to stderr from gRPC core, but said someone is in
# Python code thus necessitating GIL acquisition.
_READ_BYTES = 2**20

class CaptureFile(object):
"""A context-managed file to redirect output to a byte array.
class CapturePipe(object):
"""A context-manager pipe to redirect output to a byte array.
Use by invoking `start` (`__enter__`) and at some point invoking `stop`
(`__exit__`). At any point after the initial call to `start` call `output` to
get the current redirected output. Note that we don't currently use file
locking, so calling `output` between calls to `start` and `stop` may muddle
the result (you should only be doing this during a Python-handled interrupt as
a last ditch effort to provide output to the user).
Attributes:
_redirect_fd (int): File descriptor of file to redirect writes from.
_redirected_fd (int): File descriptor of file to redirect writes from.
_saved_fd (int): A copy of the original value of the redirected file
descriptor.
_read_thread (threading.Thread or None): Thread upon which reads through the
pipe are performed. Only non-None when self is started.
_read_fd (int or None): File descriptor of the read end of the redirect
pipe. Only non-None when self is started.
_write_fd (int or None): File descriptor of the write end of the redirect
pipe. Only non-None when self is started.
output (bytearray or None): Redirected output from writes to the redirected
file descriptor. Only valid during and after self has started.
_into_file (TemporaryFile or None): File to which writes are redirected.
Only non-None when self is started.
"""

def __init__(self, fd):
self._redirect_fd = fd
self._saved_fd = os.dup(self._redirect_fd)
self._read_thread = None
self._read_fd = None
self._write_fd = None
self.output = None
self._redirected_fd = fd
self._saved_fd = os.dup(self._redirected_fd)
self._into_file = None

def output(self):
"""Get all output from the redirected-to file if it exists."""
if self._into_file:
self._into_file.seek(0)
return bytes(self._into_file.read())
else:
return bytes()

def start(self):
"""Start redirection of writes to the file descriptor."""
self._read_fd, self._write_fd = os.pipe()
os.dup2(self._write_fd, self._redirect_fd)
flags = fcntl.fcntl(self._read_fd, fcntl.F_GETFL)
fcntl.fcntl(self._read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
self._read_thread = threading.Thread(target=self._read)
# If the user wants to exit from the Python program and hits ctrl-C and the
# read thread is somehow deadlocked with something else, the Python code may
# refuse to exit. This prevents that by making the read thread second-class.
self._read_thread.daemon = True
self._read_thread.start()
self._into_file = tempfile.TemporaryFile()
os.dup2(self._into_file.fileno(), self._redirected_fd)

def stop(self):
"""Stop redirection of writes to the file descriptor."""
os.close(self._write_fd)
os.dup2(self._saved_fd, self._redirect_fd) # auto-close self._redirect_fd
self._read_thread.join()
self._read_thread = None
# we waited for the read thread to finish, so _read_fd has been read and we
# can close it.
os.close(self._read_fd)

def _read(self):
"""Read-thread target for self."""
self.output = bytearray()
while True:
select.select([self._read_fd], [], [])
read_bytes = os.read(self._read_fd, _READ_BYTES)
if read_bytes:
self.output.extend(read_bytes)
else:
break
# n.b. this dup2 call auto-closes self._redirected_fd
os.dup2(self._saved_fd, self._redirected_fd)

def write_bypass(self, value):
"""Bypass the redirection and write directly to the original file.
Expand Down Expand Up @@ -170,8 +146,8 @@ def run(self, suite):
result_out = StringIO.StringIO()
result = _result.TerminalResult(
result_out, id_map=lambda case: case_id_by_case[case])
stdout_pipe = CapturePipe(sys.stdout.fileno())
stderr_pipe = CapturePipe(sys.stderr.fileno())
stdout_pipe = CaptureFile(sys.stdout.fileno())
stderr_pipe = CaptureFile(sys.stderr.fileno())
kill_flag = [False]

def sigint_handler(signal_number, frame):
Expand All @@ -182,7 +158,8 @@ def sigint_handler(signal_number, frame):
def fault_handler(signal_number, frame):
stdout_pipe.write_bypass(
'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n'
.format(signal_number, stdout_pipe.output, stderr_pipe.output))
.format(signal_number, stdout_pipe.output(),
stderr_pipe.output()))
os._exit(1)

def check_kill_self():
Expand All @@ -191,9 +168,9 @@ def check_kill_self():
result.stopTestRun()
stdout_pipe.write_bypass(result_out.getvalue())
stdout_pipe.write_bypass(
'\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output))
'\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output()))
stderr_pipe.write_bypass(
'\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output))
'\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output()))
os._exit(1)
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGSEGV, fault_handler)
Expand Down Expand Up @@ -223,7 +200,7 @@ def check_kill_self():
# re-raise the exception after forcing the with-block to end
raise
result.set_output(
augmented_case.case, stdout_pipe.output, stderr_pipe.output)
augmented_case.case, stdout_pipe.output(), stderr_pipe.output())
sys.stdout.write(result_out.getvalue())
sys.stdout.flush()
result_out.truncate(0)
Expand Down

0 comments on commit 220986e

Please sign in to comment.