forked from Flexget/Flexget
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathipc.py
159 lines (130 loc) · 5.34 KB
/
ipc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from __future__ import absolute_import, division, unicode_literals
import logging
import random
import string
import threading
import rpyc
from rpyc.utils.server import ThreadedServer
from flexget.logger import console, capture_output
from flexget.options import get_parser, ParserError
log = logging.getLogger('ipc')
# Allow some attributes from dict interface to be called over the wire
rpyc.core.protocol.DEFAULT_CONFIG['safe_attrs'].update(['items'])
rpyc.core.protocol.DEFAULT_CONFIG['allow_pickle'] = True
IPC_VERSION = 3
AUTH_ERROR = 'authentication error'
AUTH_SUCCESS = 'authentication success'
class RemoteStream(object):
"""
Used as a filelike to stream text to remote client. If client disconnects while this is in use, an error will be
logged, but no exception raised.
"""
def __init__(self, writer):
"""
:param writer: A function which writes a line of text to remote client.
"""
self.buffer = None
self.writer = writer
def write(self, data):
# This relies on all data up to a newline being either unicode or str, not mixed
if not self.buffer:
self.buffer = data
else:
self.buffer += data
newline = b'\n' if isinstance(self.buffer, str) else '\n'
if newline in self.buffer:
self.flush()
def flush(self):
if self.buffer is None or self.writer is None:
return
try:
self.writer(self.buffer, end='')
except EOFError:
self.writer = None
log.error('Client ended connection while still streaming output.')
finally:
self.buffer = None
class DaemonService(rpyc.Service):
# This will be populated when the server is started
manager = None
def exposed_version(self):
return IPC_VERSION
def exposed_handle_cli(self, args):
args = rpyc.utils.classic.obtain(args)
log.verbose('Running command `%s` for client.' % ' '.join(args))
parser = get_parser()
try:
options = parser.parse_args(args, file=self.client_out_stream)
except SystemExit as e:
if e.code:
# TODO: Not sure how to properly propagate the exit code back to client
log.debug('Parsing cli args caused system exit with status %s.' % e.code)
return
if not options.cron:
with capture_output(self.client_out_stream, loglevel=options.loglevel):
self.manager.handle_cli(options)
else:
self.manager.handle_cli(options)
def client_console(self, text):
self._conn.root.console(text)
@property
def client_out_stream(self):
return RemoteStream(self._conn.root.console)
class ClientService(rpyc.Service):
def on_connect(self):
"""Make sure the client version matches our own."""
daemon_version = self._conn.root.version()
if IPC_VERSION != daemon_version:
self._conn.close()
raise ValueError('Daemon is different version than client.')
def exposed_version(self):
return IPC_VERSION
def exposed_console(self, text, *args, **kwargs):
console(text, *args, **kwargs)
class IPCServer(threading.Thread):
def __init__(self, manager, port=None):
super(IPCServer, self).__init__(name='ipc_server')
self.daemon = True
self.manager = manager
self.host = '127.0.0.1'
self.port = port or 0
self.password = ''.join(random.choice(string.ascii_letters + string.digits) for x in range(15))
self.server = None
def authenticator(self, sock):
channel = rpyc.Channel(rpyc.SocketStream(sock))
password = channel.recv()
if password != self.password:
channel.send(AUTH_ERROR)
raise rpyc.utils.authenticators.AuthenticationError('Invalid password from client.')
channel.send(AUTH_SUCCESS)
return sock, self.password
def run(self):
# Make the rpyc logger a bit quieter when we aren't in debugging.
rpyc_logger = logging.getLogger('ipc.rpyc')
if logging.getLogger().getEffectiveLevel() > logging.DEBUG:
rpyc_logger.setLevel(logging.WARNING)
DaemonService.manager = self.manager
self.server = ThreadedServer(
DaemonService, hostname=self.host, port=self.port, authenticator=self.authenticator, logger=rpyc_logger
)
# If we just chose an open port, write save the chosen one
self.port = self.server.listener.getsockname()[1]
self.manager.write_lock(ipc_info={'port': self.port, 'password': self.password})
self.server.start()
def shutdown(self):
if self.server:
self.server.close()
class IPCClient(object):
def __init__(self, port, password):
channel = rpyc.Channel(rpyc.SocketStream.connect('127.0.0.1', port))
channel.send(password)
response = channel.recv()
if response == AUTH_ERROR:
# TODO: What to raise here. I guess we create a custom error
raise ValueError('Invalid password for daemon')
self.conn = rpyc.utils.factory.connect_channel(channel, service=ClientService)
def close(self):
self.conn.close()
def __getattr__(self, item):
"""Proxy all other calls to the exposed daemon service."""
return getattr(self.conn.root, item)