forked from cedricp/ddt4all
-
Notifications
You must be signed in to change notification settings - Fork 0
/
usbdevice.py
182 lines (146 loc) · 5.7 KB
/
usbdevice.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import array
import time
from usb import util, core, legacy
import elm
# HID TYPES
USBRQ_HID_GET_REPORT = 0x01
USBRQ_HID_SET_REPORT = 0x09
USB_HID_REPORT_TYPE_FEATURE = 0x02
# CUSTOM REQUESTS TYPES
CUSTOM_RQ_SET_STATUS = 0x01
CUSTOM_RQ_GET_STATUS = 0x02
# VENDOR TYPES
VENDOR_CAN_MODE = 0x00
VENDOR_CAN_TX = 0x01
VENDOR_CAN_RX = 0x02
VENDOR_CAN_RX_SIZE = 0x03
# CAN MODE
CAN_MONITOR_MODE = 0x00
CAN_ISOTP_MODE = 0x01
# HELPERS
REQUEST_TYPE_SEND = util.build_request_type(util.CTRL_OUT,
util.CTRL_TYPE_CLASS,
util.CTRL_RECIPIENT_DEVICE)
REQUEST_TYPE_RECV = util.build_request_type(util.CTRL_IN,
util.CTRL_TYPE_CLASS,
util.CTRL_RECIPIENT_DEVICE)
class UsbCan:
def __init__(self):
self.device = None
self.descriptor = ""
self.init()
def init(self):
self.device = core.find(idVendor=0x16c0, idProduct=0x05df)
if self.device:
self.descriptor = self.get_string_descriptor()
print("Found USB adapter : %s" % self.descriptor)
return True
self.device = None
return False
def is_init(self):
return self.device is not None
def get_string_descriptor(self):
response = self.device.ctrl_transfer(util.ENDPOINT_IN,
legacy.REQ_GET_DESCRIPTOR,
util.DESC_TYPE_STRING,
0, # language id
255) # length
return response[2:].tostring().decode('utf-16')
def get_vendor_request(self, vendor_id):
bytes = self.device.ctrl_transfer(bmRequestType=util.CTRL_IN | util.CTRL_TYPE_VENDOR,
bRequest=CUSTOM_RQ_GET_STATUS,
wIndex=vendor_id)
return bytes
def set_vendor_request(self, vendor_id, value):
bytes = self.device.ctrl_transfer(bmRequestType=util.CTRL_OUT | util.CTRL_TYPE_VENDOR,
bRequest=CUSTOM_RQ_SET_STATUS,
wIndex=vendor_id,
wValue=value)
return bytes
def get_data(self, length=511):
response = self.device.ctrl_transfer(bmRequestType=REQUEST_TYPE_RECV,
bRequest=USBRQ_HID_GET_REPORT,
data_or_wLength=length)
return " ".join([hex(b)[2:].upper().zfill(2) for b in response])
def set_data(self, data):
response = self.device.ctrl_transfer(bmRequestType=REQUEST_TYPE_SEND,
bRequest=USBRQ_HID_GET_REPORT,
data_or_wLength=data)
return " ".join([hex(b)[2:].upper().zfill(2) for b in response])
def get_read_buffer_length(self):
ret = self.set_vendor_request(VENDOR_CAN_RX_SIZE, 0)
length = int("0x" + "".join([hex(b)[2:].upper().zfill(2) for b in ret]), 16)
if length == 0xFFFF:
return -1
if length == 0xFFFE:
return 0
return length
def get_buffer(self, timeout=500):
start = time.time()
while 1:
leng = self.get_read_buffer_length()
if leng == -1:
return ("WRONG RESPONSE")
if leng > 0:
break
if time.time() - start > timeout / 1000:
return ("TIMEOUT")
return self.get_data(leng)
def set_tx_addr(self, addr):
self.set_vendor_request(VENDOR_CAN_TX, addr)
def set_rx_addr(self, addr):
self.set_vendor_request(VENDOR_CAN_RX, addr)
def get_tx_addr(self):
return self.get_vendor_request(VENDOR_CAN_TX)
def get_rx_addr(self):
return self.get_vendor_request(VENDOR_CAN_RX)
def set_can_mode_isotp(self):
self.set_vendor_request(VENDOR_CAN_MODE, CAN_ISOTP_MODE)
def set_can_mode_monitor(self):
self.set_vendor_request(VENDOR_CAN_MODE, CAN_MONITOR_MODE)
class OBDDevice:
def __init__(self):
self.device = UsbCan()
if self.device is None:
self.connectionStatus = False
return
self.connectionStatus = True
self.currentaddress = 0x00
self.startSession = ""
self.rsp_cache = {}
def request(self, req, positive='', cache=True, serviceDelay="0"):
req_as_bytes = array.array('B', [int("0x" + a, 16) for a in req.split(" ")])
self.device.set_data(req_as_bytes)
return self.device.get_buffer(500)
def close_protocol(self):
pass
def start_session_can(self, start_session):
self.startSession = start_session
self.device.set_data(self.startSession)
retcode = self.data.get_data()
if retcode.startswith('50'):
return True
return False
def init_can(self):
self.device.set_can_mode_isotp()
def clear_cache(self):
''' Clear L2 cache before screen update
'''
self.rsp_cache = {}
def set_can_addr(self, addr, ecu, canline=0):
if 'idTx' in ecu and 'idRx' in ecu:
TXa = ecu['idTx']
RXa = ecu['idRx']
self.currentaddress = elm.get_can_addr(TXa)
else:
TXa = elm.dnat[addr]
RXa = elm.snat[addr]
self.device.set_tx_addr(TXa)
self.device.set_rx_addr(RXa)
if __name__ == '__main__':
dev = OBDDevice()
dev.init_can()
dev.set_can_addr(26, {})
print(dev.start_session_can("10C0"))