Skip to content

Commit

Permalink
Asynchronous Windows named pipes
Browse files Browse the repository at this point in the history
Non blocking poll.   1000 times faster than current implementation.  40% faster than sockets for local connections.
  • Loading branch information
pyscripter authored Nov 18, 2018
1 parent 8af751d commit e9c6bff
Showing 1 changed file with 85 additions and 8 deletions.
93 changes: 85 additions & 8 deletions rpyc/core/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from rpyc.lib.compat import poll, select_error, BYTES_LITERAL, get_exc_errno, maxint
win32file = safe_import("win32file")
win32pipe = safe_import("win32pipe")
win32event = safe_import("win32event")
pywintypes = safe_import("pywintypes")
msvcrt = safe_import("msvcrt")
ssl = safe_import("ssl")

Expand Down Expand Up @@ -397,8 +399,8 @@ def write(self, data):
self.close()
raise EOFError(ex)

def poll(self, timeout, interval = 0.1):
"""a poor man's version of select()"""
def poll(self, timeout, interval = 0.001):
"""a Windows version of select()"""
timeout = Timeout(timeout)
try:
while True:
Expand All @@ -421,14 +423,21 @@ class NamedPipeStream(Win32PipeStream):
NAMED_PIPE_PREFIX = r'\\.\pipe\rpyc_'
PIPE_IO_TIMEOUT = 3
CONNECT_TIMEOUT = 3
__slots__ = ("is_server_side",)

def __init__(self, handle, is_server_side):
Win32PipeStream.__init__(self, handle, handle)
self.is_server_side = is_server_side
self.read_overlapped = pywintypes.OVERLAPPED()
self.read_overlapped.hEvent = win32event.CreateEvent(None,1,1,None)
self.write_overlapped = pywintypes.OVERLAPPED()
self.write_overlapped.hEvent = win32event.CreateEvent(None,1,1,None)
self.poll_buffer = win32file.AllocateReadBuffer(1)
self.poll_read = False

@classmethod
def from_std(cls):
raise NotImplementedError()

@classmethod
def create_pair(cls):
raise NotImplementedError()
Expand All @@ -449,8 +458,8 @@ def create_server(cls, pipename, connect = True):
pipename = cls.NAMED_PIPE_PREFIX + pipename
handle = win32pipe.CreateNamedPipe(
pipename,
win32pipe.PIPE_ACCESS_DUPLEX,
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE | win32pipe.PIPE_WAIT,
win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED,
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE,
1,
cls.PIPE_BUFFER_SIZE,
cls.PIPE_BUFFER_SIZE,
Expand All @@ -467,7 +476,8 @@ def connect_server(self):
until a connection arrives)"""
if not self.is_server_side:
raise ValueError("this must be the server side")
win32pipe.ConnectNamedPipe(self.incoming, None)
win32pipe.ConnectNamedPipe(self.incoming, self.write_overlapped)
win32event.WaitForSingleObject(self.write_overlapped.hEvent, win32event.INFINITE)

@classmethod
def create_client(cls, pipename):
Expand All @@ -488,7 +498,7 @@ def create_client(cls, pipename):
0,
None,
win32file.OPEN_EXISTING,
0,
win32file.FILE_FLAG_OVERLAPPED,
None
)
return cls(handle, False)
Expand All @@ -499,9 +509,76 @@ def close(self):
if self.is_server_side:
win32file.FlushFileBuffers(self.outgoing)
win32pipe.DisconnectNamedPipe(self.outgoing)

win32file.CloseHandle(self.read_overlapped.hEvent)
win32file.CloseHandle(self.write_overlapped.hEvent)
Win32PipeStream.close(self)

def read(self, count):
try:
if self.poll_read:
win32file.GetOverlappedResult(self.incoming, self.read_overlapped, 1)
data = [self.poll_buffer]
self.poll_read = False
count -= 1
else:
data = []
while count > 0:
hr, buf = win32file.ReadFile(self.incoming,
win32file.AllocateReadBuffer(int(min(self.MAX_IO_CHUNK, count))),
self.read_overlapped)
n = win32file.GetOverlappedResult(self.incoming, self.read_overlapped, 1)
count -= n
data.append(buf[:n])
except TypeError:
ex = sys.exc_info()[1]
if not self.closed:
raise
raise EOFError(ex)
except win32file.error:
ex = sys.exc_info()[1]
self.close()
raise EOFError(ex)
return BYTES_LITERAL("").join(data)

def write(self, data):
try:
while data:
dummy, count = win32file.WriteFile(self.outgoing, data[:self.MAX_IO_CHUNK], self.write_overlapped)
data = data[count:]
except TypeError:
ex = sys.exc_info()[1]
if not self.closed:
raise
raise EOFError(ex)
except win32file.error:
ex = sys.exc_info()[1]
self.close()
raise EOFError(ex)

def poll(self, timeout, interval = 0.001):
"""Windows version of select()"""
timeout = Timeout(timeout)
try:
if timeout.finite:
wait_time = int(max(1, timeout.timeleft() * 1000))
else:
wait_time = win32event.INFINITE

if not self.poll_read:
hr, self.poll_buffer = win32file.ReadFile(self.incoming,
self.poll_buffer,
self.read_overlapped)
self.poll_read = True;
if hr == 0:
return True
res = win32event.WaitForSingleObject(self.read_overlapped.hEvent, wait_time)
return res == win32event.WAIT_OBJECT_0
except TypeError:
ex = sys.exc_info()[1]
if not self.closed:
raise
raise EOFError(ex)

if sys.platform == "win32":
PipeStream = Win32PipeStream

0 comments on commit e9c6bff

Please sign in to comment.