Skip to content

Commit

Permalink
Merge branch 'trunk' into 8147-named-systemd-file-descriptors
Browse files Browse the repository at this point in the history
  • Loading branch information
exarkun authored Oct 11, 2022
2 parents 55789f2 + 8cb7594 commit 4b8250c
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 213 deletions.
5 changes: 0 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
repos:
- repo: https://github.com/asottile/pyupgrade
rev: v2.23.1
hooks:
- id: pyupgrade
args: ["--py36-plus"]

- repo: https://github.com/pycqa/isort
rev: 5.9.3
Expand Down
7 changes: 4 additions & 3 deletions src/twisted/internet/_glibbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from twisted.internet import base, posixbase, selectreactor
from twisted.internet.interfaces import IReactorFDSet
from twisted.python import log
from ._signals import _UnixWaker


def ensureNotImported(moduleNames, errorMessage, preventImports=[]):
Expand Down Expand Up @@ -48,13 +49,13 @@ def ensureNotImported(moduleNames, errorMessage, preventImports=[]):
sys.modules[name] = None


class GlibWaker(posixbase._UnixWaker):
class GlibWaker(_UnixWaker):
"""
Run scheduled events after waking up.
"""

def doRead(self):
posixbase._UnixWaker.doRead(self)
def doRead(self) -> None:
super().doRead()
self.reactor._simulate()


Expand Down
204 changes: 204 additions & 0 deletions src/twisted/internet/_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,19 @@
"""


import contextlib
import errno
import os
import signal
import socket

from zope.interface import Attribute, Interface, implementer

from twisted.python import failure, log, util
from twisted.python.runtime import platformType

if platformType == "posix":
from . import fdesc, process


def installHandler(fd):
Expand Down Expand Up @@ -66,3 +78,195 @@ def isDefaultHandler():
Determine whether the I{SIGCHLD} handler is the default or not.
"""
return signal.getsignal(signal.SIGCHLD) == signal.SIG_DFL


class _IWaker(Interface):
"""
Interface to wake up the event loop based on the self-pipe trick.
The U{I{self-pipe trick}<http://cr.yp.to/docs/selfpipe.html>}, used to wake
up the main loop from another thread or a signal handler.
This is why we have wakeUp together with doRead
This is used by threads or signals to wake up the event loop.
"""

disconnected = Attribute("")

def wakeUp():
"""
Called when the event should be wake up.
"""

def doRead():
"""
Read some data from my connection and discard it.
"""

def connectionLost(reason: failure.Failure) -> None:
"""
Called when connection was closed and the pipes.
"""


@implementer(_IWaker)
class _SocketWaker(log.Logger):
"""
The I{self-pipe trick<http://cr.yp.to/docs/selfpipe.html>}, implemented
using a pair of sockets rather than pipes (due to the lack of support in
select() on Windows for pipes), used to wake up the main loop from
another thread.
"""

disconnected = 0

def __init__(self, reactor):
"""Initialize."""
self.reactor = reactor
# Following select_trigger (from asyncore)'s example;
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
with contextlib.closing(
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
) as server:
server.bind(("127.0.0.1", 0))
server.listen(1)
client.connect(server.getsockname())
reader, clientaddr = server.accept()
client.setblocking(0)
reader.setblocking(0)
self.r = reader
self.w = client
self.fileno = self.r.fileno

def wakeUp(self):
"""Send a byte to my connection."""
try:
util.untilConcludes(self.w.send, b"x")
except OSError as e:
if e.args[0] != errno.WSAEWOULDBLOCK:
raise

def doRead(self):
"""
Read some data from my connection.
"""
try:
self.r.recv(8192)
except OSError:
pass

def connectionLost(self, reason):
self.r.close()
self.w.close()


class _FDWaker(log.Logger):
"""
The I{self-pipe trick<http://cr.yp.to/docs/selfpipe.html>}, used to wake
up the main loop from another thread or a signal handler.
L{_FDWaker} is a base class for waker implementations based on
writing to a pipe being monitored by the reactor.
@ivar o: The file descriptor for the end of the pipe which can be
written to wake up a reactor monitoring this waker.
@ivar i: The file descriptor which should be monitored in order to
be awoken by this waker.
"""

disconnected = 0

i = None
o = None

def __init__(self, reactor):
"""Initialize."""
self.reactor = reactor
self.i, self.o = os.pipe()
fdesc.setNonBlocking(self.i)
fdesc._setCloseOnExec(self.i)
fdesc.setNonBlocking(self.o)
fdesc._setCloseOnExec(self.o)
self.fileno = lambda: self.i

def doRead(self):
"""
Read some bytes from the pipe and discard them.
"""
fdesc.readFromFD(self.fileno(), lambda data: None)

def connectionLost(self, reason):
"""Close both ends of my pipe."""
if not hasattr(self, "o"):
return
for fd in self.i, self.o:
try:
os.close(fd)
except OSError:
pass
del self.i, self.o


@implementer(_IWaker)
class _UnixWaker(_FDWaker):
"""
This class provides a simple interface to wake up the event loop.
This is used by threads or signals to wake up the event loop.
"""

def wakeUp(self):
"""Write one byte to the pipe, and flush it."""
# We don't use fdesc.writeToFD since we need to distinguish
# between EINTR (try again) and EAGAIN (do nothing).
if self.o is not None:
try:
util.untilConcludes(os.write, self.o, b"x")
except OSError as e:
# XXX There is no unit test for raising the exception
# for other errnos. See #4285.
if e.errno != errno.EAGAIN:
raise


if platformType == "posix":
_Waker = _UnixWaker
else:
# Primarily Windows and Jython.
_Waker = _SocketWaker # type: ignore[misc,assignment]


class _SIGCHLDWaker(_FDWaker):
"""
L{_SIGCHLDWaker} can wake up a reactor whenever C{SIGCHLD} is
received.
"""

def __init__(self, reactor):
_FDWaker.__init__(self, reactor)

def install(self):
"""
Install the handler necessary to make this waker active.
"""
installHandler(self.o)

def uninstall(self):
"""
Remove the handler which makes this waker active.
"""
installHandler(-1)

def doRead(self):
"""
Having woken up the reactor in response to receipt of
C{SIGCHLD}, reap the process which exited.
This is called whenever the reactor notices the waker pipe is
writeable, which happens soon after any call to the C{wakeUp}
method.
"""
_FDWaker.doRead(self)
process.reapAllProcesses()
11 changes: 8 additions & 3 deletions src/twisted/internet/cfreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,20 @@
)

from twisted.internet.interfaces import IReactorFDSet
from twisted.internet.posixbase import _NO_FILEDESC, PosixReactorBase, _Waker
from twisted.internet.posixbase import _NO_FILEDESC, PosixReactorBase
from twisted.python import log

# We know that we're going to run on macOS so we can just pick the
# POSIX-appropriate waker. This also avoids having a dynamic base class and
# so lets more things get type checked.
from ._signals import _UnixWaker

_READ = 0
_WRITE = 1
_preserveSOError = 1 << 6


class _WakerPlus(_Waker):
class _WakerPlus(_UnixWaker):
"""
The normal Twisted waker will simply wake up the main loop, which causes an
iteration to run, which in turn causes L{ReactorBase.runUntilCurrent}
Expand All @@ -74,7 +79,7 @@ def doRead(self):
Wake up the loop and force C{runUntilCurrent} to run immediately in the
next timed iteration.
"""
result = _Waker.doRead(self)
result = super().doRead()
self.reactor._scheduleSimulate(True)
return result

Expand Down
Loading

0 comments on commit 4b8250c

Please sign in to comment.