forked from deluge-torrent/deluge
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransfer.py
158 lines (128 loc) · 5.05 KB
/
transfer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#
# Copyright (C) 2012 Bro <bro.development@gmail.com>
# Copyright (C) 2018 Andrew Resch <andrewresch@gmail.com>
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
import logging
import struct
import zlib
import rencode
from twisted.internet.protocol import Protocol
log = logging.getLogger(__name__)
PROTOCOL_VERSION = 1
MESSAGE_HEADER_FORMAT = '!BI'
MESSAGE_HEADER_SIZE = struct.calcsize(MESSAGE_HEADER_FORMAT)
class DelugeTransferProtocol(Protocol):
"""
Deluge RPC wire protocol.
Data messages are transferred with a header containing a protocol version
and the length of the data to be transferred (payload).
The format is::
ubyte uint4 bytestring
|.version.|..size..|.....body.....|
The version is an unsigned byte that indicates the protocol version.
The size is a unsigned 32-bit integer that is equal to the length of the body bytestring.
The body is the compressed rencoded byte string of the data object.
"""
def __init__(self):
self._buffer = b'' # TODO: Look into using bytearray instead of byte string.
self._message_length = 0
self._bytes_received = 0
self._bytes_sent = 0
def transfer_message(self, data):
"""
Transfer the data.
:param data: data to be transferred in a data structure serializable by rencode.
"""
body = zlib.compress(rencode.dumps(data))
body_len = len(body)
message = struct.pack(
f'{MESSAGE_HEADER_FORMAT}{body_len}s',
PROTOCOL_VERSION,
body_len,
body,
)
self._bytes_sent += len(message)
self.transport.write(message)
def dataReceived(self, data): # NOQA: N802
"""
This method is called whenever data is received.
:param data: a message as transferred by transfer_message, or a part of such
a message.
Global variables:
_buffer - contains the data received
_message_length - the length of the payload of the current message.
"""
self._buffer += data
self._bytes_received += len(data)
while len(self._buffer) >= MESSAGE_HEADER_SIZE:
if self._message_length == 0:
self._handle_new_message()
# We have a complete packet
if len(self._buffer) >= self._message_length:
self._handle_complete_message(self._buffer[: self._message_length])
# Remove message data from buffer
self._buffer = self._buffer[self._message_length :]
self._message_length = 0
else:
break
def _handle_new_message(self):
"""
Handle the start of a new message. This method is called only when the
beginning of the buffer contains data from a new message (i.e. the header).
"""
try:
# Read the first bytes of the message (MESSAGE_HEADER_SIZE bytes)
header = self._buffer[:MESSAGE_HEADER_SIZE]
# Extract the length stored as an unsigned 32-bit integer
version, self._message_length = struct.unpack(MESSAGE_HEADER_FORMAT, header)
if version != PROTOCOL_VERSION:
raise Exception(
'Received invalid protocol version: {}. PROTOCOL_VERSION is {}.'.format(
version, PROTOCOL_VERSION
)
)
# Remove the header from the buffer
self._buffer = self._buffer[MESSAGE_HEADER_SIZE:]
except Exception as ex:
log.warning('Error occurred when parsing message header: %s.', ex)
log.warning(
'This version of Deluge cannot communicate with the sender of this data.'
)
self._message_length = 0
self._buffer = b''
def _handle_complete_message(self, data):
"""
Handles a complete message as it is transferred on the network.
:param data: a zlib compressed string encoded with rencode.
"""
try:
self.message_received(
rencode.loads(zlib.decompress(data), decode_utf8=True)
)
except Exception as ex:
log.warning(
'Failed to decompress (%d bytes) and load serialized data with rencode: %s',
len(data),
ex,
)
def get_bytes_recv(self):
"""
Returns the number of bytes received.
:returns: the number of bytes received
:rtype: int
"""
return self._bytes_received
def get_bytes_sent(self):
"""
Returns the number of bytes sent.
:returns: the number of bytes sent
:rtype: int
"""
return self._bytes_sent
def message_received(self, message):
"""Override this method to receive the complete message"""
pass