Skip to content
Snippets Groups Projects
Commit fd5a3ef9 authored by Masood Malekghassemi's avatar Masood Malekghassemi
Browse files

Don't use a pipe for capturing in test runner

Apparently Python can call arbitrarily deallocation code whenever its
allocator is invoked, which can cause output from gRPC core to stderr,
which can happen on the thread that is emptying the stderr pipe, thus
causing the stderr pipe to deadlock on itself.
parent 0b3e515c
No related branches found
No related tags found
No related merge requests found
...@@ -35,6 +35,7 @@ import os ...@@ -35,6 +35,7 @@ import os
import select import select
import signal import signal
import sys import sys
import tempfile
import threading import threading
import time import time
import unittest import unittest
...@@ -43,72 +44,47 @@ import uuid ...@@ -43,72 +44,47 @@ import uuid
from tests import _loader from tests import _loader
from tests import _result from tests import _result
# This number needs to be large enough to outpace output on stdout and stderr
# from the gRPC core, otherwise we could end up in a potential deadlock. This
# stems from the OS waiting on someone to clear a filled pipe buffer while the
# GIL is held from a write to stderr from gRPC core, but said someone is in
# Python code thus necessitating GIL acquisition.
_READ_BYTES = 2**20
class CaptureFile(object):
"""A context-managed file to redirect output to a byte array.
class CapturePipe(object): Use by invoking `start` (`__enter__`) and at some point invoking `stop`
"""A context-manager pipe to redirect output to a byte array. (`__exit__`). At any point after the initial call to `start` call `output` to
get the current redirected output. Note that we don't currently use file
locking, so calling `output` between calls to `start` and `stop` may muddle
the result (you should only be doing this during a Python-handled interrupt as
a last ditch effort to provide output to the user).
Attributes: Attributes:
_redirect_fd (int): File descriptor of file to redirect writes from. _redirected_fd (int): File descriptor of file to redirect writes from.
_saved_fd (int): A copy of the original value of the redirected file _saved_fd (int): A copy of the original value of the redirected file
descriptor. descriptor.
_read_thread (threading.Thread or None): Thread upon which reads through the _into_file (TemporaryFile or None): File to which writes are redirected.
pipe are performed. Only non-None when self is started. Only non-None when self is started.
_read_fd (int or None): File descriptor of the read end of the redirect
pipe. Only non-None when self is started.
_write_fd (int or None): File descriptor of the write end of the redirect
pipe. Only non-None when self is started.
output (bytearray or None): Redirected output from writes to the redirected
file descriptor. Only valid during and after self has started.
""" """
def __init__(self, fd): def __init__(self, fd):
self._redirect_fd = fd self._redirected_fd = fd
self._saved_fd = os.dup(self._redirect_fd) self._saved_fd = os.dup(self._redirected_fd)
self._read_thread = None self._into_file = None
self._read_fd = None
self._write_fd = None def output(self):
self.output = None """Get all output from the redirected-to file if it exists."""
if self._into_file:
self._into_file.seek(0)
return bytes(self._into_file.read())
else:
return bytes()
def start(self): def start(self):
"""Start redirection of writes to the file descriptor.""" """Start redirection of writes to the file descriptor."""
self._read_fd, self._write_fd = os.pipe() self._into_file = tempfile.TemporaryFile()
os.dup2(self._write_fd, self._redirect_fd) os.dup2(self._into_file.fileno(), self._redirected_fd)
flags = fcntl.fcntl(self._read_fd, fcntl.F_GETFL)
fcntl.fcntl(self._read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
self._read_thread = threading.Thread(target=self._read)
# If the user wants to exit from the Python program and hits ctrl-C and the
# read thread is somehow deadlocked with something else, the Python code may
# refuse to exit. This prevents that by making the read thread second-class.
self._read_thread.daemon = True
self._read_thread.start()
def stop(self): def stop(self):
"""Stop redirection of writes to the file descriptor.""" """Stop redirection of writes to the file descriptor."""
os.close(self._write_fd) # n.b. this dup2 call auto-closes self._redirected_fd
os.dup2(self._saved_fd, self._redirect_fd) # auto-close self._redirect_fd os.dup2(self._saved_fd, self._redirected_fd)
self._read_thread.join()
self._read_thread = None
# we waited for the read thread to finish, so _read_fd has been read and we
# can close it.
os.close(self._read_fd)
def _read(self):
"""Read-thread target for self."""
self.output = bytearray()
while True:
select.select([self._read_fd], [], [])
read_bytes = os.read(self._read_fd, _READ_BYTES)
if read_bytes:
self.output.extend(read_bytes)
else:
break
def write_bypass(self, value): def write_bypass(self, value):
"""Bypass the redirection and write directly to the original file. """Bypass the redirection and write directly to the original file.
...@@ -170,8 +146,8 @@ class Runner(object): ...@@ -170,8 +146,8 @@ class Runner(object):
result_out = StringIO.StringIO() result_out = StringIO.StringIO()
result = _result.TerminalResult( result = _result.TerminalResult(
result_out, id_map=lambda case: case_id_by_case[case]) result_out, id_map=lambda case: case_id_by_case[case])
stdout_pipe = CapturePipe(sys.stdout.fileno()) stdout_pipe = CaptureFile(sys.stdout.fileno())
stderr_pipe = CapturePipe(sys.stderr.fileno()) stderr_pipe = CaptureFile(sys.stderr.fileno())
kill_flag = [False] kill_flag = [False]
def sigint_handler(signal_number, frame): def sigint_handler(signal_number, frame):
...@@ -182,7 +158,8 @@ class Runner(object): ...@@ -182,7 +158,8 @@ class Runner(object):
def fault_handler(signal_number, frame): def fault_handler(signal_number, frame):
stdout_pipe.write_bypass( stdout_pipe.write_bypass(
'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n' 'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n'
.format(signal_number, stdout_pipe.output, stderr_pipe.output)) .format(signal_number, stdout_pipe.output(),
stderr_pipe.output()))
os._exit(1) os._exit(1)
def check_kill_self(): def check_kill_self():
...@@ -191,9 +168,9 @@ class Runner(object): ...@@ -191,9 +168,9 @@ class Runner(object):
result.stopTestRun() result.stopTestRun()
stdout_pipe.write_bypass(result_out.getvalue()) stdout_pipe.write_bypass(result_out.getvalue())
stdout_pipe.write_bypass( stdout_pipe.write_bypass(
'\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output)) '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output()))
stderr_pipe.write_bypass( stderr_pipe.write_bypass(
'\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output)) '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output()))
os._exit(1) os._exit(1)
signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGSEGV, fault_handler) signal.signal(signal.SIGSEGV, fault_handler)
...@@ -223,7 +200,7 @@ class Runner(object): ...@@ -223,7 +200,7 @@ class Runner(object):
# re-raise the exception after forcing the with-block to end # re-raise the exception after forcing the with-block to end
raise raise
result.set_output( result.set_output(
augmented_case.case, stdout_pipe.output, stderr_pipe.output) augmented_case.case, stdout_pipe.output(), stderr_pipe.output())
sys.stdout.write(result_out.getvalue()) sys.stdout.write(result_out.getvalue())
sys.stdout.flush() sys.stdout.flush()
result_out.truncate(0) result_out.truncate(0)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment