Skip to content

Commit

Permalink
Implement IOLoop.run_in_executor (tornadoweb#2067)
Browse files Browse the repository at this point in the history
  • Loading branch information
mivade authored and bdarnell committed Oct 21, 2017
1 parent 37081d7 commit 648b3e9
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 1 deletion.
29 changes: 29 additions & 0 deletions tornado/ioloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
except ImportError:
signal = None

try:
from concurrent.futures import ThreadPoolExecutor
except ImportError:
ThreadPoolExecutor = None

if PY3:
import _thread as thread
Expand Down Expand Up @@ -635,6 +639,29 @@ def add_future(self, future, callback):
future.add_done_callback(
lambda future: self.add_callback(callback, future))

def run_in_executor(self, executor, func, *args):
"""Runs a function in a ``concurrent.futures.Executor``. If
``executor`` is ``None``, the IO loop's default executor will be used.
Use `functools.partial` to pass keyword arguments to `func`.
"""
if ThreadPoolExecutor is None:
raise RuntimeError(
"concurrent.futures is required to use IOLoop.run_in_executor")

if executor is None:
if not hasattr(self, '_executor'):
from tornado.process import cpu_count
self._executor = ThreadPoolExecutor(max_workers=(cpu_count() * 5))
executor = self._executor

return executor.submit(func, *args)

def set_default_executor(self, executor):
"""Sets the default executor to use with :meth:`run_in_executor`."""
self._executor = executor

def _run_callback(self, callback):
"""Runs a callback with error handling.
Expand Down Expand Up @@ -777,6 +804,8 @@ def close(self, all_fds=False):
self._impl.close()
self._callbacks = None
self._timeouts = None
if hasattr(self, '_executor'):
self._executor.shutdown()

def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
Expand Down
59 changes: 58 additions & 1 deletion tornado/test/ioloop_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
from tornado.log import app_log
from tornado.platform.select import _Select
from tornado.stack_context import ExceptionStackContext, StackContext, wrap, NullContext
from tornado.testing import AsyncTestCase, bind_unused_port, ExpectLog
from tornado.testing import AsyncTestCase, bind_unused_port, ExpectLog, gen_test
from tornado.test.util import unittest, skipIfNonUnix, skipOnTravis, skipBefore35, exec_test
from tornado.concurrent import Future

try:
from concurrent import futures
Expand Down Expand Up @@ -598,6 +599,62 @@ def handle_exception(typ, value, traceback):
self.assertEqual(self.exception.args[0], "callback")
self.assertEqual(self.future.exception().args[0], "worker")

@gen_test
def test_run_in_executor_gen(self):
event1 = threading.Event()
event2 = threading.Event()

def callback(self_event, other_event):
self_event.set()
time.sleep(0.01)
self.assertTrue(other_event.is_set())
return self_event

res = yield [
IOLoop.current().run_in_executor(None, callback, event1, event2),
IOLoop.current().run_in_executor(None, callback, event2, event1)
]

self.assertEqual([event1, event2], res)

@skipBefore35
def test_run_in_executor_native(self):
event1 = threading.Event()
event2 = threading.Event()

def callback(self_event, other_event):
self_event.set()
time.sleep(0.01)
self.assertTrue(other_event.is_set())
other_event.wait()
return self_event

namespace = exec_test(globals(), locals(), """
async def main():
res = await gen.multi([
IOLoop.current().run_in_executor(None, callback, event1, event2),
IOLoop.current().run_in_executor(None, callback, event2, event1)
])
self.assertEqual([event1, event2], res)
""")
IOLoop.current().run_sync(namespace['main'])

def test_set_default_executor(self):
class MyExecutor(futures.Executor):
def submit(self, func, *args):
return Future()

event = threading.Event()

def future_func():
event.set()

executor = MyExecutor()
loop = IOLoop.current()
loop.set_default_executor(executor)
loop.run_in_executor(None, future_func)
loop.add_timeout(0.01, lambda: self.assertFalse(event.is_set()))


class TestIOLoopRunSync(unittest.TestCase):
def setUp(self):
Expand Down

0 comments on commit 648b3e9

Please sign in to comment.