Skip to content

Commit

Permalink
Merge desynchronize-2673-4: Brief description
Browse files Browse the repository at this point in the history
Author: glyph

Reviewer: exarkun, hawkowl

Fixes: twisted#2673

Fix an intermittent test failure, rewriting nearly everything about
multithreading in Twisted in the process.

git-svn-id: svn://svn.twistedmatrix.com/svn/Twisted/trunk@45376 bbbe8e31-12d6-0310-92fd-ac37d47ddeeb
  • Loading branch information
glyph committed Aug 3, 2015
1 parent ee7e85a commit b572e7e
Show file tree
Hide file tree
Showing 19 changed files with 1,680 additions and 243 deletions.
114 changes: 67 additions & 47 deletions docs/core/howto/threading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,74 +6,95 @@
Using Threads in Twisted
========================

Running code in a thread-safe manner
------------------------------------
How Twisted Uses Threads Itself
-------------------------------

Most code in Twisted is not thread-safe.
All callbacks registered with the reactor - for example, ``dataReceived``, ``connectionLost``, or any higher-level method that comes from these, such as ``render_GET`` in twisted.web, or a callback added to a ``Deferred`` - are called from ``reactor.run``.
The terminology around this is that we say these callbacks are run in the "main thread", or "reactor thread" or "I/O thread".

Therefore, internally, Twisted makes very little use of threads.
This is not to say that it makes *no* use of threads; there are plenty of APIs which have no non-blocking equivalent, so when Twisted needs to call those, it calls them in a thread.
One prominent example of this is system hostname resolution: unless you have configured Twisted to use its own DNS client in ``twisted.names``, it will have to use your operating system's blocking APIs to map host names to IP addresses, in the reactor's thread pool.
However, this is something you only need to know about for resource-tuning purposes, like setting the number of threads to use; otherwise, it is an implementation detail you can ignore.

It is a common mistake is to think that because Twisted can manage multiple connections once, things are happening in multiple threads, and so you need to carefully manage locks.
Lucky for you, Twisted does most things in one thread!
This document will explain how to interact with existing APIs which need to be run within their own threads because they block.
If you're just using Twisted's own APIs, the rule for threads is simply "don't use them".

Invoking Twisted From Other Threads
-----------------------------------

Methods within Twisted may only be invoked from the reactor thread unless otherwise noted.
Very few things within Twisted are thread-safe.
For example, writing data to a transport from a protocol is not thread-safe.
Therefore, we want a way to schedule methods to be run in the main event loop.
This means that if you start a thread and call a Twisted method, you might get correct behavior... or you might get hangs, crashes, or corrupted data.
So don't do it.

The right way to call methods on the reactor from another thread, and therefore any objects which might call methods on the reactor, is to give a function to the reactor to execute within its own thread.
This can be done using the function :api:`twisted.internet.interfaces.IReactorThreads.callFromThread <callFromThread>`::

from twisted.internet import reactor
def notThreadSafe(someProtocol, message):
someProtocol.transport.write(b"a message: " + message)
def callFromWhateverThreadYouWant():
reactor.callFromThread(notThreadSafe, b"hello")

def notThreadSafe(x):
"""do something that isn't thread-safe"""
# ...
In this example, ``callFromWhateverThreadYouWant`` is thread-safe and can be invoked by any thread, but ``notThreadSafe`` should only ever be called by code running in the thread where ``reactor.run`` is running.

def threadSafeScheduler():
"""Run in thread-safe manner."""
reactor.callFromThread(notThreadSafe, 3) # will run 'notThreadSafe(3)'
# in the event loop
reactor.run()
.. note::

There are many objects within Twisted that represent values - for example, :api:`twisted.python.filepath.FilePath <FilePath>` and :api:`twisted.python.urlpath.URLPath <URLPath>` - which you may construct yourself.
These may be safely constructed and used within a non-reactor thread as long as they are not shared with other threads.
However, you should be sure that these objects do not share any state, especially not with the reactor.
One good rule of thumb is that any object whose methods return ``Deferred``\ s is almost certainly touching the reactor at some point, and should never be accessed from a non-reactor thread.

Running code in threads
Running Code In Threads
-----------------------

Sometimes we may want to run methods in threads.
For example, in order to access blocking APIs.
Twisted provides methods for doing so using the :api:`twisted.internet.interfaces.IReactorThreads <IReactorThreads>` API.
Additional utility functions are provided in :api:`twisted.internet.threads <twisted.internet.threads>`.
Basically, these methods allow us to queue methods to be run by a thread pool.
Sometimes we may want to run code in a non-reactor thread, to avoid blocking the reactor.
Twisted provides an API for doing so, the :api:`twisted.internet.interfaces.IReactorThreads.callInThread <callInThread>` method on the reactor.

For example, to run a method in a thread we can do::
For example, to run a method in a non-reactor thread we can do::

from twisted.internet import reactor

def aSillyBlockingMethod(x):
import time
time.sleep(2)
print x
print(x)

# run method in thread
reactor.callInThread(aSillyBlockingMethod, "2 seconds have passed")
reactor.run()

``callInThread`` will put your code into a queue, to be run by the next available thread in the reactor's thread pool.
This means that depending on what other work has been submitted to the pool, your method may not run immediately.

Utility Methods
---------------
.. note::
Keep in mind that ``callInThread`` can only concurrently run a fixed maximum number of tasks, and all users of the reactor are sharing that limit.
Therefore, you should not submit *tasks which depend on other tasks in order to complete* to be executed by ``callInThread``.
An example of such a task would be something like this::

The utility methods are not part of the :api:`twisted.internet.reactor <reactor>` APIs, but are implemented in :api:`twisted.internet.threads <threads>`.
q = Queue()
def blocker():
print(q.get() + q.get())
def unblocker(a, b):
q.put(a)
q.put(b)

If we have multiple methods to run sequentially within a thread, we can do::
In this case, ``blocker`` will block *forever* unless ``unblocker`` can successfully run to give it inputs; similarly, ``unblocker`` might block forever if ``blocker`` is not run to consume its outputs.
So if you had a threadpool of maximum size X, and you ran ``for each in range(X): reactor.callInThread(blocker)``, the reactor threadpool would be wedged forever, unable to process more work or even shut down.

from twisted.internet import reactor, threads
See "Managing the Reactor Thread Pool" below to tune these limits.

def aSillyBlockingMethodOne(x):
import time
time.sleep(2)
print x
Getting Results
---------------

def aSillyBlockingMethodTwo(x):
print x
callInThread and callFromThread allow you to move the execution of your code out of and into the reactor thread, respectively, but that isn't always enough.

# run both methods sequentially in a thread
commands = [(aSillyBlockingMethodOne, ["Calling First"], {})]
commands.append((aSillyBlockingMethodTwo, ["And the second"], {}))
threads.callMultipleInThread(commands)
reactor.run()
When we run some code, we often want to know what its result was. For this, Twisted provides two methods: :api:`twisted.internet.threads.deferToThread <deferToThread>` and :api:`twisted.internet.threads.blockingCallFromThread <blockingCallFromThread>`, defined in the ``twisted.internet.threads`` module.

For functions whose results we wish to get, we can have the result returned as a Deferred::
To get a result from some blocking code back into the reactor thread, we can use :api:`twisted.internet.threads.deferToThread <deferToThread>` to execute it instead of callFromThread.

from twisted.internet import reactor, threads

Expand All @@ -82,14 +103,14 @@ For functions whose results we wish to get, we can have the result returned as a
return 3

def printResult(x):
print x
print(x)

# run method in thread and get result as defer.Deferred
d = threads.deferToThread(doLongCalculation)
d.addCallback(printResult)
reactor.run()

If you wish to call a method in the reactor thread and get its result, you can use :api:`twisted.internet.threads.blockingCallFromThread <blockingCallFromThread>`::
Similarly, you want some code running in a non-reactor thread wants to invoke some code in the reactor thread and get its result, you can use :api:`twisted.internet.threads.blockingCallFromThread <blockingCallFromThread>`::

from twisted.internet import threads, reactor, defer
from twisted.web.client import getPage
Expand All @@ -111,18 +132,17 @@ If you wish to call a method in the reactor thread and get its result, you can u
``blockingCallFromThread`` will return the object or raise the exception returned or raised by the function passed to it.
If the function passed to it returns a Deferred, it will return the value the Deferred is called back with or raise the exception it is errbacked with.


Managing the Thread Pool
------------------------

The thread pool is implemented by :api:`twisted.python.threadpool.ThreadPool <ThreadPool>`.
Managing the Reactor Thread Pool
--------------------------------

We may want to modify the size of the thread pool, increasing or decreasing the number of threads in use.
We can do this do this quite easily::
We can do this::

from twisted.internet import reactor

reactor.suggestThreadPoolSize(30)

The default size of the thread pool depends on the reactor being used; the default reactor uses a minimum size of 5 and a maximum size of 10.
Be careful that you understand threads and their resource usage before drastically altering the thread pool sizes.

The reactor thread pool is implemented by :api:`twisted.python.threadpool.ThreadPool <ThreadPool>`.
To access methods on this object for more advanced tuning and monitoring (see the API documentation for details) you can get the thread pool with :api:`twisted.internet.interfaces.IReactorThreads.getThreadPool <getThreadPool>`.
25 changes: 25 additions & 0 deletions twisted/_threads/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# -*- test-case-name: twisted.test.test_paths -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Twisted integration with operating system threads.
"""

from __future__ import absolute_import, division, print_function

from ._threadworker import ThreadWorker, LockWorker
from ._ithreads import IWorker, AlreadyQuit
from ._team import Team
from ._memory import createMemoryWorker
from ._pool import pool

__all__ = [
"ThreadWorker",
"LockWorker",
"IWorker",
"AlreadyQuit",
"Team",
"createMemoryWorker",
"pool",
]
46 changes: 46 additions & 0 deletions twisted/_threads/_convenience.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- test-case-name: twisted._threads.test.test_convenience -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Common functionality used within the implementation of various workers.
"""

from __future__ import absolute_import, division, print_function

from ._ithreads import AlreadyQuit


class Quit(object):
"""
A flag representing whether a worker has been quit.
@ivar isSet: Whether this flag is set.
@type isSet: L{bool}
"""

def __init__(self):
"""
Create a L{Quit} un-set.
"""
self.isSet = False


def set(self):
"""
Set the flag if it has not been set.
@raise AlreadyQuit: If it has been set.
"""
self.check()
self.isSet = True


def check(self):
"""
Check if the flag has been set.
@raise AlreadyQuit: If it has been set.
"""
if self.isSet:
raise AlreadyQuit()
61 changes: 61 additions & 0 deletions twisted/_threads/_ithreads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# -*- test-case-name: twisted._threads.test -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Interfaces related to threads.
"""

from __future__ import absolute_import, division, print_function

from zope.interface import Interface


class AlreadyQuit(Exception):
"""
This worker worker is dead and cannot execute more instructions.
"""



class IWorker(Interface):
"""
A worker that can perform some work concurrently.
All methods on this interface must be thread-safe.
"""

def do(task):
"""
Perform the given task.
As an interface, this method makes no specific claims about concurrent
execution. An L{IWorker}'s C{do} implementation may defer execution
for later on the same thread, immediately on a different thread, or
some combination of the two. It is valid for a C{do} method to
schedule C{task} in such a way that it may never be executed.
It is important for some implementations to provide specific properties
with respect to where C{task} is executed, of course, and client code
may rely on a more specific implementation of C{do} than L{IWorker}.
@param task: a task to call in a thread or other concurrent context.
@type task: 0-argument callable
@raise AlreadyQuit: if C{quit} has been called.
"""

def quit():
"""
Free any resources associated with this L{IWorker} and cause it to
reject all future work.
@raise: L{AlreadyQuit} if this method has already been called.
"""


class IExclusiveWorker(IWorker):
"""
Like L{IWorker}, but with the additional guarantee that the callables
passed to C{do} will not be called exclusively with each other.
"""
71 changes: 71 additions & 0 deletions twisted/_threads/_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# -*- test-case-name: twisted._threads.test.test_memory -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Implementation of an in-memory worker that defers execution.
"""

from __future__ import absolute_import, division, print_function

from zope.interface import implementer

from . import IWorker
from ._convenience import Quit

NoMoreWork = object()

@implementer(IWorker)
class MemoryWorker(object):
"""
An L{IWorker} that queues work for later performance.
@ivar _quit: a flag indicating
@type _quit: L{Quit}
"""

def __init__(self, pending=list):
"""
Create a L{MemoryWorker}.
"""
self._quit = Quit()
self._pending = pending()


def do(self, work):
"""
Queue some work for to perform later; see L{createMemoryWorker}.
@param work: The work to perform.
"""
self._quit.check()
self._pending.append(work)


def quit(self):
"""
Quit this worker.
"""
self._quit.set()
self._pending.append(NoMoreWork)



def createMemoryWorker():
"""
Create an L{IWorker} that does nothing but defer work, to be performed
later.
@return: a worker that will enqueue work to perform later, and a callable
that will perform one element of that work.
@rtype: 2-L{tuple} of (L{IWorker}, L{callable})
"""
def perform():
if not worker._pending:
return False
if worker._pending[0] is NoMoreWork:
return False
worker._pending.pop(0)()
return True
worker = MemoryWorker()
return (worker, perform)
Loading

0 comments on commit b572e7e

Please sign in to comment.