Skip to content

Commit

Permalink
Merge pull request gevent#1722 from gevent/issue1670
Browse files Browse the repository at this point in the history
Install thread profiling/tracing hooks in ThreadPool worker threads while the task runs
  • Loading branch information
jamadden authored Dec 18, 2020
2 parents abcd3a9 + 60cc6b0 commit 0eefec3
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 2 deletions.
6 changes: 6 additions & 0 deletions docs/changes/1678.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Make worker threads created by :class:`gevent.threadpool.ThreadPool` install
the :func:`threading.setprofile` and :func:`threading.settrace` hooks
while tasks are running. This provides visibility to profiling and
tracing tools like yappi.

Reported by Suhail Muhammed.
99 changes: 99 additions & 0 deletions src/gevent/tests/test__threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,5 +721,104 @@ def wake():
self.assertIsNotNone(tr.receiver)


class TestWorkerProfileAndTrace(TestCase):
# Worker threads should execute the test and trace functions.
# (When running the user code.)
# https://github.com/gevent/gevent/issues/1670

old_profile = None
old_trace = None

def setUp(self):
super(TestWorkerProfileAndTrace, self).setUp()
self.old_profile = gevent.threadpool._get_thread_profile()
self.old_trace = gevent.threadpool._get_thread_trace()

def tearDown(self):
import threading
threading.setprofile(self.old_profile)
threading.settrace(self.old_trace)

def test_get_profile(self):
import threading
threading.setprofile(self)
self.assertIs(gevent.threadpool._get_thread_profile(), self)

def test_get_trace(self):
import threading
threading.settrace(self)
self.assertIs(gevent.threadpool._get_thread_trace(), self)

def _test_func_called_in_task(self, func):
import threading
import sys

setter = getattr(threading, 'set' + func)
getter = getattr(sys, 'get' + func)

called = [0]
def callback(*_args):
called[0] += 1

def task():
test.assertIsNotNone(getter)
return 1701


before_task = []
after_task = []

test = self
class Pool(ThreadPool):
class _WorkerGreenlet(ThreadPool._WorkerGreenlet):
# pylint:disable=signature-differs
def _before_run_task(self, func, *args):
before_task.append(func)
before_task.append(getter())
ThreadPool._WorkerGreenlet._before_run_task(self, func, *args)
before_task.append(getter())

def _after_run_task(self, func, *args):
after_task.append(func)
after_task.append(getter())
ThreadPool._WorkerGreenlet._after_run_task(self, func, *args)
after_task.append(getter())

self.ClassUnderTest = Pool

pool = self._makeOne(1, create_all_worker_threads=True)
assert isinstance(pool, Pool)

# Do this after creating the pool and its thread to verify we don't
# capture the function at thread creation time.
setter(callback)


res = pool.apply(task)
self.assertEqual(res, 1701)
self.assertGreaterEqual(called[0], 1)

# Shutdown the pool. PyPy2.7-7.3.1 on Windows/Appveyor was
# properly seeing the before_task value, but after_task was empty.
# That suggested a memory consistency type issue, where the updates
# written by the other thread weren't fully visible to this thread
# yet. Try to kill it to see if that helps. (Couldn't reproduce
# on macOS).
#
# https://ci.appveyor.com/project/jamadden/gevent/build/job/wo9likk85cduui7n#L867
pool.kill()

# The function is active only for the scope of the function
self.assertEqual(before_task, [task, None, callback])
self.assertEqual(after_task, [task, callback, None])

def test_profile_called_in_task(self):
self._test_func_called_in_task('profile')

def test_trace_called_in_task(self):
self._test_func_called_in_task('trace')



if __name__ == '__main__':
greentest.main()
43 changes: 41 additions & 2 deletions src/gevent/threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ def _format_hub(hub):
hub.__class__.__name__, id(hub), hub.thread_ident
)


def _get_thread_profile(_sys=sys):
if 'threading' in _sys.modules:
return _sys.modules['threading']._profile_hook


def _get_thread_trace(_sys=sys):
if 'threading' in _sys.modules:
return _sys.modules['threading']._trace_hook


class _WorkerGreenlet(RawGreenlet):
# Exists to produce a more useful repr for worker pool
# threads/greenlets, and manage the communication of the worker
Expand Down Expand Up @@ -137,12 +148,27 @@ def __print_tb(tb, stderr):
file=stderr)
tb = tb.tb_next

def _before_run_task(self, func, args, kwargs, thread_result,
_sys=sys,
_get_thread_profile=_get_thread_profile,
_get_thread_trace=_get_thread_trace):
# pylint:disable=unused-argument
_sys.setprofile(_get_thread_profile())
_sys.settrace(_get_thread_trace())

def _after_run_task(self, func, args, kwargs, thread_result, _sys=sys):
# pylint:disable=unused-argument
_sys.setprofile(None)
_sys.settrace(None)

def __run_task(self, func, args, kwargs, thread_result):
self._before_run_task(func, args, kwargs, thread_result)
try:
thread_result.set(func(*args, **kwargs))
except: # pylint:disable=bare-except
thread_result.handle_error((self, func), self._exc_info())
finally:
self._after_run_task(func, args, kwargs, thread_result)
del func, args, kwargs, thread_result

def run(self):
Expand Down Expand Up @@ -236,12 +262,23 @@ class ThreadPool(GroupMappingMixin):
The `len` of instances of this class is the number of enqueued
(unfinished) tasks.
Just before a task starts running in a worker thread,
the values of :func:`threading.setprofile` and :func:`threading.settrace`
are consulted. Any values there are installed in that thread for the duration
of the task (using :func:`sys.setprofile` and :func:`sys.settrace`, respectively).
(Because worker threads are long-lived and outlast any given task, this arrangement
lets the hook functions change between tasks, but does not let them see the
bookkeeping done by the worker thread itself.)
.. caution:: Instances of this class are only true if they have
unfinished tasks.
.. versionchanged:: 1.5a3
The undocumented ``apply_e`` function, deprecated since 1.1,
was removed.
.. versionchanged:: NEXT
Install the profile and trace functions in the worker thread while
the worker thread is running the supplied task.
"""

__slots__ = (
Expand All @@ -268,6 +305,8 @@ class ThreadPool(GroupMappingMixin):
'task_queue',
)

_WorkerGreenlet = _WorkerGreenlet

def __init__(self, maxsize, hub=None):
if hub is None:
hub = get_hub()
Expand Down Expand Up @@ -421,7 +460,7 @@ def _adjust_step(self):
self.fork_watcher.stop()

def _adjust_wait(self):
delay = 0.0001
delay = self.hub.loop.approx_timer_resolution
while True:
self._adjust_step()
if len(self._worker_greenlets) <= self._maxsize:
Expand All @@ -437,7 +476,7 @@ def adjust(self):
self.manager = Greenlet.spawn(self._adjust_wait)

def _add_thread(self):
_WorkerGreenlet(self)
self._WorkerGreenlet(self)

def spawn(self, func, *args, **kwargs):
"""
Expand Down

0 comments on commit 0eefec3

Please sign in to comment.