# -*- coding: utf-8 -*-
import math, string
import options
import elm
import zipfile
from xml.dom.minidom import parse
import xml.dom.minidom
import json, os
import re
import glob
import argparse
from StringIO import StringIO
__author__ = "Cedric PAILLE"
__copyright__ = "Copyright 2016-2018"
__credits__ = []
__license__ = "GPL"
__version__ = "1.0.0"
__maintainer__ = "Cedric PAILLE"
__email__ = "cedricpaille@gmail.com"
__status__ = "Beta"
# Returns signed value from 16 bits (2 bytes)
def hex16_tosigned(value):
return -(value & 0x8000) | (value & 0x7fff)
# Returns signed value from 8 bits (1 byte)
def hex8_tosigned(value):
return -(value & 0x80) | (value & 0x7f)
def cleanhtml(raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
def getChildNodesByName(parent, name):
nodes = []
for node in parent.childNodes:
if node.nodeType == node.ELEMENT_NODE and node.localName == name:
nodes.append(node)
return nodes
class Data_item:
def __init__(self, item, req_endian, name = ''):
self.firstbyte = 0
self.bitoffset = 0
self.ref = False
self.endian = ''
self.req_endian = req_endian
if isinstance(item, dict):
self.name = name
if 'firstbyte' in item:
self.firstbyte = item['firstbyte']
if 'bitoffset' in item:
self.bitoffset = item['bitoffset']
if 'ref' in item:
self.ref = item['ref']
if 'endian' in item:
self.endian = item['endian']
else:
self.name = item.getAttribute("Name")
fb = item.getAttribute("FirstByte")
if fb:
self.firstbyte = int(fb)
bo = item.getAttribute("BitOffset")
if bo:
self.bitoffset = int(bo)
endian = item.getAttribute("Endian")
if endian:
self.endian = endian.encode('ascii')
ref = item.getAttribute("Ref")
if ref and ref == '1':
self.ref = True
def dump(self):
js = {}
if self.firstbyte != 0: js['firstbyte'] = self.firstbyte
if self.bitoffset != 0: js['bitoffset'] = self.bitoffset
if self.ref != False:
js['ref'] = self.ref
if self.endian != '':
js['endian'] = self.endian
return js
class Ecu_device:
def __init__(self, dev):
self.dtc = 0
self.dtctype = 0
self.devicedata = {}
self.name = ''
if isinstance(dev, dict):
# Json data
self.name = dev['name']
self.dtctype = dev['dtctype']
self.devicedata = dev['devicedata']
self.dtc = dev['dtc']
else:
self.name = dev.getAttribute("Name")
dtc = dev.getAttribute("DTC")
if dtc: self.dtc = int(dtc)
dtctype = dev.getAttribute("Type")
if dtctype: self.dtctype = int(dtctype)
devicedata = dev.getElementsByTagName("DeviceData")
if devicedata:
for data in devicedata:
name = data.getAttribute("Name")
failureflag = data.getAttribute("FailureFlag")
self.devicedata[name] = failureflag
def dump(self):
js = {}
js['dtc'] = self.dtc
js['dtctype'] = self.dtctype
js['devicedata'] = self.devicedata
js['name'] = self.name
return js
class Ecu_request:
def __init__(self, data, ecu_file):
self.minbytes = 0
self.shiftbytescount = 0
self.replybytes = ''
self.manualsend = False
self.sentbytes = ''
self.dataitems = {}
self.sendbyte_dataitems = {}
self.name = ''
self.ecu_file = ecu_file
# StartDiagSession requirements
# Seems relatively useless...
self.sds = {'nosds': True,
'plant': True,
'aftersales': True,
'engineering': True,
'supplier': True}
if isinstance(data, dict):
if data.has_key('minbytes'): self.minbytes = data['minbytes']
if data.has_key('shiftbytescount'): self.shiftbytescount = data['shiftbytescount']
if data.has_key('replybytes'): self.replybytes = data['replybytes']
if data.has_key('manualsend'): self.manualsend = data['manualsend']
if data.has_key('sentbytes'): self.sentbytes = data['sentbytes']
self.name = data['name']
if 'deny_sds' in data:
if 'nosds' in data['deny_sds']:
self.sds['nosds'] = False
if 'plant' in data['deny_sds']:
self.sds['plant'] = False
if 'aftersales' in data['deny_sds']:
self.sds['aftersales'] = False
if 'engineering' in data['deny_sds']:
self.sds['engineering'] = False
if 'supplier' in data['deny_sds']:
self.sds['supplier'] = False
if data.has_key('sendbyte_dataitems'):
sbdi = data['sendbyte_dataitems']
for k, v in sbdi.iteritems():
di = Data_item(v, self.ecu_file.endianness, k)
self.sendbyte_dataitems[k] = di
if data.has_key('receivebyte_dataitems'):
rbdi = data['receivebyte_dataitems']
for k, v in rbdi.iteritems():
di = Data_item(v, self.ecu_file.endianness, k)
self.dataitems[k] = di
elif isinstance(data, unicode):
# Create a blank, new one
self.name = data
else:
self.xmldoc = data
self.name = self.xmldoc.getAttribute("Name")
accessdata = self.xmldoc.getElementsByTagName("DenyAccess").item(0)
if accessdata:
for accessdatachild in accessdata.childNodes:
if accessdatachild.nodeName == "NoSDS":
self.sds['nosds'] = False
elif accessdatachild.nodeName == "Plant":
self.sds['plant'] = False
elif accessdatachild.nodeName == "AfterSales":
self.sds['aftersales'] = False
elif accessdatachild.nodeName == "Engineering":
self.sds['engineering'] = False
elif accessdatachild.nodeName == "Supplier":
self.sds['supplier'] = False
manualsenddata = self.xmldoc.getElementsByTagName("ManuelSend").item(0)
if manualsenddata:
self.manualsend = True
shiftbytescount = self.xmldoc.getElementsByTagName("ShiftBytesCount")
if shiftbytescount:
self.shiftbytescount = int(shiftbytescount.item(0).firstChild.nodeValue)
replybytes = self.xmldoc.getElementsByTagName("ReplyBytes")
if replybytes:
self.replybytes = replybytes.item(0).firstChild.nodeValue
receiveddata = self.xmldoc.getElementsByTagName("Received").item(0)
if receiveddata:
minbytes = receiveddata.getAttribute("MinBytes")
if minbytes:
self.minbytes = int(minbytes)
dataitems = receiveddata.getElementsByTagName("DataItem")
if dataitems:
for dataitem in dataitems:
di = Data_item(dataitem, self.ecu_file.endianness)
self.dataitems[di.name] = di
sentdata = self.xmldoc.getElementsByTagName("Sent")
if sentdata:
sent = sentdata.item(0)
sentbytesdata = sent.getElementsByTagName("SentBytes")
if sentbytesdata:
if sentbytesdata.item(0).firstChild:
self.sentbytes = sentbytesdata.item(0).firstChild.nodeValue
dataitems = sent.getElementsByTagName("DataItem")
if dataitems:
for dataitem in dataitems:
di = Data_item(dataitem, self.ecu_file.endianness)
self.sendbyte_dataitems[di.name] = di
def send_request(self, inputvalues={}, test_data=None):
request_stream = self.build_data_stream(inputvalues)
request_stream = " ".join(request_stream)
if options.debug:
print "Generated stream ", request_stream
if options.simulation_mode:
if test_data is not None:
elmstream = test_data
print "Send request stream", request_stream
else:
# return default reply bytes...
elmstream = self.replybytes
else:
elmstream = options.elm.request(request_stream)
if options.debug:
print "Received stream ", elmstream
if elmstream.startswith('WRONG RESPONSE'):
return None
if elmstream.startswith('7F'):
nrsp = options.elm.errorval(elmstream[6:8])
print "Request ECU Error", nrsp
return None
values = self.get_values_from_stream(elmstream)
if options.debug:
print "Decoded values", values
return values
def get_data_inputs(self):
return self.sendbyte_dataitems.keys()
def build_data_stream(self, data):
data_stream = self.get_formatted_sentbytes()
for k, v in data.iteritems():
if k in self.sendbyte_dataitems:
datatitem = self.sendbyte_dataitems[k]
else:
raise KeyError('Ecurequest::build_data_stream : Data item %s does not exist' % k)
if k in self.ecu_file.data:
data = self.ecu_file.data[k]
else:
raise KeyError('Ecurequest::build_data_stream : Data %s does not exist' % k)
if v in data.items:
v = hex(data.items[v])[2:].upper()
data.setValue(v, data_stream, datatitem, self.ecu_file.endianness)
return data_stream
def get_values_from_stream(self, stream):
values = {}
for k, v in self.dataitems.iteritems():
if k in self.ecu_file.data:
data = self.ecu_file.data[k]
values[k] = data.getDisplayValue(stream, v, self.ecu_file.endianness)
else:
raise KeyError('Ecurequest::get_values_from_stream : Data %s does not exist' % k)
return values
def get_formatted_sentbytes(self):
bytes_to_send_ascii = self.sentbytes.encode('ascii', 'ignore')
return [bytes_to_send_ascii[i:i + 2] for i in range(0, len(bytes_to_send_ascii), 2)]
def dump(self):
js = {}
if self.minbytes != 0: js['minbytes'] = self.minbytes
if self.shiftbytescount != 0: js['shiftbytescount'] = self.shiftbytescount
if self.replybytes != '': js['replybytes'] = self.replybytes
if self.manualsend: js['manualsend'] = self.manualsend
if self.sentbytes != '': js['sentbytes'] = self.sentbytes
js['name'] = self.name
js['deny_sds'] = []
if self.sds['nosds'] == False:
js['deny_sds'].append('nosds')
if self.sds['plant'] == False:
js['deny_sds'].append('plant')
if self.sds['aftersales'] == False:
js['deny_sds'].append('aftersales')
if self.sds['engineering'] == False:
js['deny_sds'].append('engineering')
if self.sds['supplier'] == False:
js['deny_sds'].append('supplier')
sdi = {}
for key, value in self.sendbyte_dataitems.iteritems():
sdi[key] = value.dump()
if len(sdi):
js['sendbyte_dataitems'] = sdi
rdi = {}
for key, value in self.dataitems.iteritems():
rdi[key] = value.dump()
if len(rdi):
js['receivebyte_dataitems'] = rdi
return js
def dump_dataitems(self):
di = {}
for key, value in self.dataitems.iteritems():
di[key] = value.dump()
return di
def dump_sentdataitems(self):
di = {}
for key, value in self.sendbyte_dataitems.iteritems():
di[key] = value.dump()
return di
class Ecu_data:
def __init__(self, data, name=''):
self.bitscount = 8
self.scaled = False
self.signed = False
self.byte = False
self.binary = False
self.bytescount = 1
self.bytesascii = False
self.step = 1.0
self.offset = 0.0
self.divideby = 1.0
self.format = ""
self.lists = {}
self.items = {}
self.description = ''
self.unit = ""
self.comment = ''
self.name = name
if data:
self.init(data)
def init(self, data):
if isinstance(data, dict):
if 'bitscount' in data:
self.bitscount = data['bitscount']
if 'bytesascii' in data:
self.bytesascii = data['bytesascii']
if 'scaled' in data:
self.scaled = data['scaled']
if 'signed' in data:
self.signed = data['signed']
if 'byte' in data:
self.byte = data['byte']
if 'binary' in data:
self.binary = data['binary']
if 'step' in data:
self.step = data['step']
if 'offset' in data:
self.offset = data['offset']
if 'divideby' in data:
self.divideby = data['divideby']
if 'format' in data:
self.format = data['format']
if 'bytescount' in data:
self.bytescount = data['bytescount']
if 'unit' in data:
self.unit = data['unit']
if 'comment' in data:
self.comment = data['comment']
if data.has_key('lists'):
for k, v in data['lists'].iteritems():
self.lists[int(k)] = v
self.items[v] = int(k)
else:
self.xmldoc = data
self.name = self.xmldoc.getAttribute("Name")
description = self.xmldoc.getElementsByTagName("Description")
if description:
self.description = description.item(0).firstChild.nodeValue.replace('', '')
comment = self.xmldoc.getElementsByTagName("Comment")
if comment:
self.comment = comment.item(0).firstChild.nodeValue.replace('', '')
lst = self.xmldoc.getElementsByTagName("List")
if lst:
for l in lst:
items = l.getElementsByTagName("Item")
for item in items:
key = int(item.getAttribute('Value'))
val = item.getAttribute('Text')
self.lists[int(key)] = val
self.items[val] = int(key)
bytes = self.xmldoc.getElementsByTagName("Bytes")
if bytes:
self.byte = True
bytescount = bytes.item(0).getAttribute("count").replace(',', '.')
if '.' in bytescount:
self.bytescount = math.ceil(float(bytescount))
self.bitscount = int(float(bytescount) * 8)
elif bytescount:
self.bytescount = int(bytescount)
self.bitscount = self.bytescount * 8
bytesascii = bytes.item(0).getAttribute("ascii")
if bytesascii and bytesascii == '1': self.bytesascii = True
bits = self.xmldoc.getElementsByTagName("Bits")
if bits:
bitscount = bits.item(0).getAttribute("count")
if bitscount:
self.bitscount = int(bitscount)
self.bytescount = int(math.ceil(float(bitscount) / 8.0))
signed = bits.item(0).getAttribute("signed")
if signed:
self.signed = True
binary = bits.item(0).getElementsByTagName("Binary")
if binary:
self.binary = True
scaled_value = bits.item(0).getElementsByTagName("Scaled")
if scaled_value:
self.scaled = True
sc = scaled_value.item(0)
step = sc.getAttribute("Step")
if step:
self.step = float(step)
offset = sc.getAttribute("Offset")
if offset:
self.offset = float(offset)
divideby = sc.getAttribute("DivideBy")
if divideby:
self.divideby = float(divideby)
format = sc.getAttribute("Format")
if format:
self.format = format
unit = sc.getAttribute("Unit")
if unit:
self.unit = unit
def dump(self):
js = {}
if self.bitscount != 8:
js['bitscount'] = self.bitscount
if self.scaled != False:
js['scaled'] = self.scaled
if self.signed != False:
js['signed'] = self.signed
if self.byte != False:
js['byte'] = self.byte
if self.binary != False:
js['binary'] = self.binary
if self.bytescount != 1:
js['bytescount'] = self.bytescount
if self.bytesascii != False:
js['bytesascii'] = self.bytesascii
if self.step != 1:
js['step'] = self.step
if self.offset != 0:
js['offset'] = self.offset
if self.divideby != 1:
js['divideby'] = self.divideby
if self.format != '':
js['format'] = self.format
if len(self.lists) > 0:
lst = {}
for k, v in self.lists.iteritems():
lst[int(k)] = v
js['lists'] = lst
if self.unit != '':
js['unit'] = self.unit
if self.comment != '':
js['comment'] = cleanhtml(self.comment)
return self.name, js
def setValue(self, value, bytes_list, dataitem, ecu_endian, test_mode=False):
start_byte = dataitem.firstbyte - 1
start_bit = dataitem.bitoffset
little_endian = False
if ecu_endian == "Little":
little_endian = True
# It seems that DataItem can override Request endianness
if dataitem.endian == "Little":
little_endian = True
if dataitem.endian == "Big":
little_endian = False
if self.bytesascii:
value = str(value)
if self.bytescount > len(value):
value = value.ljust(self.bytescount)
if self.bytescount < len(value):
value = value[0:self.bytescount]
asciival = ""
for i in range(self.bytescount):
if not test_mode:
asciival += hex(ord(value[i]))[2:].upper()
else:
asciival += "FF"
value = asciival
if self.scaled:
if not test_mode:
try:
value = float(value)
except:
return None
# Input value must be base 10
value = int((value * float(self.divideby) - float(self.offset)) / float(self.step))
else:
value = int("0x" + value, 16)
else:
if not test_mode:
# Check input length and validity
if not all(c in string.hexdigits for c in value):
return None
# Value is base 16
value = int('0x' + str(value), 16)
else:
value = int("0x" + value, 16)
valueasbin = bin(value)[2:].zfill(self.bitscount)
numreqbytes = int(math.ceil(float(self.bitscount + start_bit) / 8.))
request_bytes = bytes_list[start_byte:start_byte + numreqbytes]
requestasbin = ""
for r in request_bytes:
requestasbin += bin(int(r, 16))[2:].zfill(8)
requestasbin = list(requestasbin)
if little_endian:
# Little endian coding is really weird :/
# Cannot figure out why it's being used
# But tried to do my best to mimic the read/write process
# Actually, need to do it in 3 steps
remainingbits = self.bitscount
# Step 1
lastbit = 7 - start_bit + 1
firstbit = lastbit - self.bitscount
if firstbit < 0:
firstbit = 0
count = 0
for i in range(firstbit, lastbit):
requestasbin[i] = valueasbin[count]
count += 1
remainingbits -= count
# Step 2
currentbyte = 1
while remainingbits >= 8:
for i in range(0, 8):
requestasbin[currentbyte * 8 + i] = valueasbin[count]
count += 1
remainingbits -= 8
currentbyte += 1
# Step 3
if remainingbits > 0:
lastbit = 8
firstbit = lastbit - remainingbits
for i in range(firstbit, lastbit):
requestasbin[currentbyte * 8 + i] = valueasbin[count]
count += 1
else:
for i in range(self.bitscount):
requestasbin[i + start_bit] = valueasbin[i]
requestasbin = "".join(requestasbin)
valueasint = int("0b" + requestasbin, 2)
valueashex = hex(valueasint)[2:].replace("L", "").zfill(numreqbytes * 2).upper()
for i in range(numreqbytes):
bytes_list[i + start_byte] = valueashex[i * 2:i * 2 + 2].zfill(2)
return bytes_list
def getDisplayValue(self, elm_data, dataitem, ecu_endian):
value = self.getHexValue(elm_data, dataitem, ecu_endian)
if value is None:
return None
if self.bytesascii:
return value.decode('hex')
# I think we want Hex format for non scaled values
if not self.scaled:
val = int('0x' + value, 16)
# Manage signed values
if self.signed:
if self.bytescount == 1:
val = hex8_tosigned(val)
elif self.bytescount == 2:
val = hex16_tosigned(val)
else:
print "Warning, cannot get signed value for %s" % dataitem.name
# Manage mapped values if exists
if val in self.lists:
return self.lists[val]
# Return default hex value
return value
value = int('0x' + value, 16)
# Manage signed values
if self.signed:
if self.bytescount == 1:
value = hex8_tosigned(value)
elif self.bytescount == 2:
value = hex16_tosigned(value)
if self.divideby == 0:
print "Division by zero, please check data item : ", dataitem.name
return None
res = (float(value) * float(self.step) + float(self.offset)) / float(self.divideby)
if len(self.format) and '.' in self.format:
acc = len(self.format.split('.')[1])
fmt = '%.' + str(acc) + 'f'
res = fmt % res
else:
if int(res) == res:
return str(int(res))
return str(res)
def getIntValue(self, resp, dataitem, ecu_endian):
val = self.getHexValue(resp, dataitem, ecu_endian)
if val is None:
return None
return int("0x" + val, 16)
def getHexValue(self, resp, dataitem, ecu_endian):
little_endian = False
if ecu_endian == "Little":
little_endian = True
if dataitem.endian == "Little":
little_endian = True
if dataitem.endian == "Big":
little_endian = False
# Data cleaning
resp = resp.strip().replace(' ', '')
if not all(c in string.hexdigits for c in resp): resp = ''
resp.replace(' ', '')
res_bytes = [resp[i:i + 2] for i in range(0, len(resp), 2)]
# Data count
startByte = dataitem.firstbyte
startBit = dataitem.bitoffset
bits = self.bitscount
databytelen = int(math.ceil(float(self.bitscount) / 8.0))
reqdatabytelen = int(math.ceil(float(self.bitscount + startBit) / 8.0))
sb = startByte - 1
if (sb * 2 + databytelen * 2) > (len(resp)):
return None
hexval = resp[sb * 2:(sb + reqdatabytelen) * 2]
hextobin = ""
for b in res_bytes[sb:sb + reqdatabytelen]:
hextobin += bin(int(b, 16))[2:].zfill(8)
if len(hexval) == 0:
return None
if little_endian:
# Don't like this method
totalremainingbits = bits
lastbit = 7 - startBit + 1
firstbit = lastbit - bits
if firstbit < 0:
firstbit = 0
tmp_bin = hextobin[firstbit:lastbit]
totalremainingbits -= lastbit - firstbit
if totalremainingbits > 8:
offset1 = 8
offset2 = offset1 + ((reqdatabytelen - 2) * 8)
tmp_bin += hextobin[offset1:offset2]
totalremainingbits -= offset2 - offset1
if totalremainingbits > 0:
offset1 = (reqdatabytelen - 1) * 8
offset2 = offset1 - totalremainingbits
tmp_bin += hextobin[offset2:offset1]
totalremainingbits -= offset1 - offset2
if totalremainingbits != 0:
print "getHexValue >> abnormal remaining bytes ", bits, totalremainingbits
hexval = hex(int("0b" + tmp_bin, 2))[2:].replace("L", "")
else:
valtmp = "0b" + hextobin[startBit:startBit + bits]
hexval = hex(int(valtmp, 2))[2:].replace("L", "")
# Resize to original length
hexval = hexval.zfill(databytelen * 2)
return hexval
class Ecu_file:
def __init__(self, data, isfile=False):
self.requests = {}
self.devices = {}
self.data = {}
self.endianness = ''
self.ecu_protocol = ''
self.ecu_send_id = "00"
self.ecu_recv_id = "00"
self.fastinit = False
self.kw1 = ""
self.kw2 = ""
self.funcname = ""
self.funcaddr = "00"
self.ecuname = ""
self.projects = []
self.autoidents = []
self.baudrate = 0
if not data:
return
if isfile:
if not os.path.exists(data):
if os.path.exists("./ecus/" + data + ".xml"):
data = "./ecus/" + data + ".xml"
if isfile and ".xml" not in data[-4:] and ".json" not in data[-5:]:
xmlname = data + ".xml"
if os.path.exists(xmlname):
data = xmlname
else:
data += ".json"
if isfile and '.json' in data:
data2 = "./json/" + os.path.basename(data)
jsdata = None
if os.path.exists(data):
jsfile = open(data, "r")
jsdata = jsfile.read()
jsfile.close()
elif os.path.exists(data2):
jsfile = open(data2, "r")
jsdata = jsfile.read()
jsfile.close()
else:
# Zipped json here
if os.path.exists('ecu.zip'):
zf = zipfile.ZipFile('ecu.zip', mode='r')
if data in zf.namelist():
jsdata = zf.read(data)
elif os.path.basename(data) in zf.namelist():
jsdata = zf.read(os.path.basename(data))
else:
print "Cannot find file ", data
return
if jsdata is None:
return
ecudict = json.loads(jsdata)
if "obd" in ecudict:
self.ecu_protocol = ecudict['obd']['protocol']
if self.ecu_protocol == "CAN":
self.ecu_send_id = ecudict['obd']['send_id']
self.ecu_recv_id = ecudict['obd']['recv_id']
if 'baudrate' in ecudict['obd']:
self.baudrate = int(ecudict['obd']['baudrate'])
if self.ecu_protocol == "KWP2000":
self.fastinit = ecudict['obd']['fastinit']
self.funcaddr = ecudict['obd']['funcaddr']
if 'funcname' in ecudict['obd']:
self.funcname = ecudict['obd']['funcname']
if "kw1" in ecudict['obd']:
self.kw1 = ecudict['obd']['kw1']
self.kw2 = ecudict['obd']['kw2']
if 'endian' in ecudict:
self.endianness = ecudict['endian']
if 'ecuname' in ecudict:
self.ecuname = ecudict['ecuname']
devices = ecudict['devices']
for device in devices:
ecu_dev = Ecu_device(device)
self.devices[ecu_dev.name] = ecu_dev
requests = ecudict['requests']
for request in requests:
ecu_req = Ecu_request(request, self)
self.requests[ecu_req.name] = ecu_req
datalist = ecudict['data']
for k, v in datalist.iteritems():
self.data[k] = Ecu_data(v, k)
else:
if isfile:
if not os.path.exists(data):
print "Cannot load ECU file", data
return
xdom = xml.dom.minidom.parse(data)
self.xmldoc = xdom.documentElement
else:
self.xmldoc = data
if not self.xmldoc:
print("XML not found")
return
target = getChildNodesByName(self.xmldoc, u"Target")
if target:
self.ecuname = target[0].getAttribute("Name")
autoidents = getChildNodesByName(target[0], u"AutoIdents")
if autoidents:
autoident = getChildNodesByName(autoidents[0], u"AutoIdent")
for ai in autoident:
autoident_dict = {}
try:
autoident_dict['diagversion'] = str(
ai.getAttribute("DiagVersion").replace(unichr(160), " ").encode("ascii", errors='ignore'))
autoident_dict['supplier'] = str(
ai.getAttribute("Supplier").replace(unichr(160), " ").encode("ascii", errors='ignore'))
autoident_dict['soft'] = str(
ai.getAttribute("Soft").replace(unichr(160), " ").encode("ascii", errors='ignore'))
autoident_dict['version'] = str(
ai.getAttribute("Version").replace(unichr(160), " ").encode("ascii", errors='ignore'))
except:
autoident_dict['diagversion'] = str(
ai.getAttribute("DiagVersion").replace(unichr(160), " ").encode("ascii"))
autoident_dict['supplier'] = str(
ai.getAttribute("Supplier").replace(unichr(160), " ").encode("ascii"))
autoident_dict['soft'] = str(
ai.getAttribute("Soft").replace(unichr(160), " ").encode("ascii"))
autoident_dict['version'] = str(
ai.getAttribute("Version").replace(unichr(160), " ").encode("ascii"))
self.autoidents.append(autoident_dict)
projects = getChildNodesByName(target[0], u"Projects")
if projects:
for project in projects[0].childNodes:
self.projects.append(project.nodeName)
functions = getChildNodesByName(target[0], u"Function")
if functions:
self.funcaddr = hex(int(functions[0].getAttribute("Address")))[2:].upper().zfill(2)
self.funcname = functions[0].getAttribute("Name")
can = getChildNodesByName(target[0], u"CAN")
if can:
self.ecu_protocol = "CAN"
self.baudrate = int(can[0].getAttribute("BaudRate"))
send_ids = getChildNodesByName(can[0], "SendId")
if send_ids:
send_id = send_ids[0]
can_id = getChildNodesByName(send_id, "CANId")
if can_id:
self.ecu_send_id = hex(int(can_id[0].getAttribute("Value")))[2:].upper()
rcv_ids = getChildNodesByName(can[0], "ReceiveId")
if rcv_ids:
rcv_id = rcv_ids[0]
can_id = getChildNodesByName(rcv_id, "CANId")
if can_id:
self.ecu_recv_id = hex(int(can_id[0].getAttribute("Value")))[2:].upper()
k = getChildNodesByName(target[0], u"K")
if k:
kwp = getChildNodesByName(k[0], u"KWP")
iso8 = getChildNodesByName(k[0], u"ISO8")
if kwp:
kwp = kwp[0]
self.ecu_protocol = u"KWP2000"
fastinit = getChildNodesByName(kwp, u"FastInit")
if fastinit:
self.fastinit = True
KW1 = getChildNodesByName(fastinit[0], "KW1")
KW2 = getChildNodesByName(fastinit[0], "KW2")
else:
iso8 = getChildNodesByName(kwp, u"ISO8")
KW1 = getChildNodesByName(iso8[0], "KW1")
KW2 = getChildNodesByName(iso8[0], "KW2")
self.kw1 = hex(int(KW1[0].getAttribute("Value")))[2:].upper()
self.kw2 = hex(int(KW2[0].getAttribute("Value")))[2:].upper()
elif iso8:
self.fastinit = False
self.ecu_protocol = "ISO8"
self.kw1 = hex(
int(getChildNodesByName(iso8[0], "KW1")[0].getAttribute("Value")))[2:].upper()
self.kw2 = hex(
int(getChildNodesByName(iso8[0], "KW2")[0].getAttribute("Value")))[2:].upper()
devices = self.xmldoc.getElementsByTagName("Device")
for d in devices:
ecu_dev = Ecu_device(d)
self.devices[ecu_dev.name] = ecu_dev
requests_tag = self.xmldoc.getElementsByTagName("Requests")
if requests_tag:
for request_tag in requests_tag:
endian = ''
endian_attr = request_tag.getAttribute("Endian")
if endian_attr:
endian = endian_attr.encode('ascii')
self.endianness = endian
requests = request_tag.getElementsByTagName("Request")
for f in requests:
ecu_req = Ecu_request(f, self)
self.requests[ecu_req.name] = ecu_req
data = self.xmldoc.getElementsByTagName("Data")
for f in data:
ecu_data = Ecu_data(f)
self.data[ecu_data.name] = ecu_data
def get_request(self, name):
if name in self.requests:
return self.requests[name]
for k, v in self.requests.iteritems():
if k.lower() == name.lower():
return v
return None
def connect_to_hardware(self, canline=0):
# Can
ecuname = self.ecuname.encode('ascii', errors='ignore')
if self.ecu_protocol == 'CAN':
short_addr = elm.get_can_addr(self.ecu_send_id)
if short_addr is None:
print "Cannot retrieve functionnal address of ECU %s @ %s" % (self.ecuname, self.ecu_send_id)
return False
ecu_conf = {'idTx': self.ecu_send_id, 'idRx': self.ecu_recv_id, 'ecuname': str(ecuname)}
if not options.simulation_mode:
if self.baudrate == 250000:
ecu_conf['brp'] = 1
options.elm.init_can()
options.elm.set_can_addr(short_addr, ecu_conf, canline)
# KWP 2000 Handling
elif self.ecu_protocol == 'KWP2000':
ecu_conf = {'idTx': '', 'idRx': '', 'ecuname': str(ecuname), 'protocol': 'KWP2000'}
options.opt_si = not self.fastinit
if not options.simulation_mode:
options.elm.init_iso()
options.elm.set_iso_addr(self.funcaddr, ecu_conf)
# ISO8 handling
elif self.ecu_protocol == 'ISO8':
ecu_conf = {'idTx': '', 'idRx': '', 'ecuname': str(ecuname), 'protocol': 'ISO8'}
if not options.simulation_mode:
options.elm.init_iso()
options.elm.set_iso8_addr(self.funcaddr, ecu_conf)
else:
return False
return True
def dump_idents(self):
idents = {}
idents["address"] = self.funcaddr
idents["group"] = self.funcname
idents["protocol"] = self.ecu_protocol
idents["projects"] = self.projects
idents["ecuname"] = self.ecuname
idents["autoidents"] = []
for ai in self.autoidents:
aidict = {"diagnostic_version": ai["diagversion"],
"supplier_code": ai["supplier"],
"soft_version": ai["soft"],
"version": ai["version"]}
idents["autoidents"].append(aidict)
return idents
def dumpJson(self):
js = {}
js['autoidents'] = self.autoidents
js['ecuname'] = self.ecuname
js['obd'] = {}
js['obd']['protocol'] = self.ecu_protocol
if self.ecu_protocol == "CAN":
js['obd']['send_id'] = self.ecu_send_id
js['obd']['recv_id'] = self.ecu_recv_id
js['obd']['baudrate'] = self.baudrate
if self.ecu_protocol == "KWP2000":
js['obd']['fastinit'] = self.fastinit
js['obd']['funcaddr'] = self.funcaddr
js['obd']['funcname'] = self.funcname
js['data'] = {}
js['requests'] = []
js['devices'] = []
if self.kw1:
js['obd']['kw1'] = self.kw1
if self.kw2:
js['obd']['kw2'] = self.kw2
if self.endianness:
js['endian'] = self.endianness
for key, value in self.data.iteritems():
name, d = value.dump()
js['data'][name] = d
for key, value in self.requests.iteritems():
js['requests'].append(value.dump())
for key, value in self.devices.iteritems():
js['devices'].append(value.dump())
dump = json.dumps(js, indent=1)
return re.sub('\n +', lambda match: '\n' + '\t' * (len(match.group().strip('\n')) / 2), dump)
# Protocols:
# KWP2000 FastInit MonoPoint ?ATSP 5?
# KWP2000 FastInit MultiPoint ?ATSP 5?
# KWP2000 Init 5 Baud Type I and II ?ATSP 4?
# DiagOnCAN ATSP 6
# CAN Messaging (125 kbps CAN) ?ATSP B?
# ISO8 ?ATSP 3?
class Ecu_ident:
def __init__(self, diagversion, supplier, soft, version, name, group, href, protocol, projects, address, zipped=False):
self.diagversion = diagversion
self.supplier = supplier
self.soft = soft
self.version = version
self.name = name
self.group = group
self.projects = projects
self.href = href
self.addr = address
if "CAN" in protocol.upper():
self.protocol = 'CAN'
elif "KWP" in protocol.upper():
self.protocol = 'KWP2000'
elif "ISO8" in protocol.upper():
self.protocol = 'ISO8'
else:
self.protocol = 'UNKNOWN'
self.hash = diagversion + supplier + soft + version
self.zipped = zipped
def checkWith(self, diagversion, supplier, soft, version, addr):
if self.diagversion == "":
return
supplier_strip = self.supplier.strip()
soft_strip = self.soft.strip()
version_strip = self.version.strip()
if int("0x" + self.diagversion, 16) != int("0x" + diagversion, 16):
return False
if supplier_strip != supplier.strip()[:len(supplier_strip)]:
return False
if soft_strip != soft.strip()[:len(soft_strip)]:
return False
if version_strip != version.strip()[:len(version_strip)]:
return False
self.addr = addr
return True
# Minimal checking
def checkApproximate(self, diagversion, supplier, soft, addr):
if self.diagversion == "":
return
if self.supplier.strip() != supplier.strip():
return False
if self.soft.strip() != soft.strip():
return False
self.addr = addr
return True
def dump(self):
js = {}
js['diagnostic_version'] = self.diagversion
js['supplier_code'] = self.supplier
js['soft_version'] = self.soft
js['version'] = self.version
js['group'] = self.group
js['projects'] = [p for p in self.projects]
js['protocol'] = self.protocol
js['address'] = self.addr
return js
class Ecu_database:
jsonfile = "json/ecus.zip"
def __init__(self, forceXML=False):
self.targets = []
self.vehiclemap = {}
self.numecu = 0
self.available_addr_kwp = []
self.available_addr_can = []
self.addr_group_mapping_long = {}
self.addr_group_mapping = {"02": u"Suspension pilotée", "51": u"Tableau de bord", "29": u"Climatisation",
"D2": u"GATEWAY", "00": u"CAN Vehicle Network", "1E": u"4WD", "01": u"ABS-VDC - ABS-ESP",
"95": u"EVC", "26": u"UCBIC/BFR", "60": u"HMD", "50": u"Tachometer", "A1": u"HFM",
"93": u"LBC", "6E": u"BVA", "04": u"Direction assistée", "68": u"PEB", "58": u"Navigation",
"2B": u"RADAR", "F7": u"LDCM", "08": u"TPMS", "C0": u"HFM", "13": u"Audio", "59": u"MIU",
"F8": u"RDCM", "24": u"ACC", "27": u"EMM", "A8": u"LBC2", "23": u"4WS", "11": u"ADAS-Sub",
"2E": u"UBP", "67": u"BCB", "0E": u"Aide au parking", "0D": u"Frein de parking électrique",
"28": u"CSHV", "FF": u"CAN2", "62": u"FCAM", "DA": u"EVC-HCM-VCM_29b", "E8": u"SVS", "2F": u"IKEY",
"64": u"SOW_right", "07": u"HLS", "D3": u"UDM", "77": u"DCM Renault", "86": u"AAU", "3A": u"AAM",
"4D": u"SCU", "DF": u"Cluster", "A5": u"DCM", "10": u"Injection NISSAN", "0B": u"ACC", "61": u"AVM",
"46": u"Engineering", "EA": u"TCASE", "87": u"C-Box", "1B": u"DIFF LOCK", "72": u"Lampes à décharge à droite 84",
"ED": u"Audio", "EC": u"TPAD", "1C": u"Pilotage capote", "37": u"Onduleur", "D0": u"GATEWAY",
"32": u"Superviseur", "A6": u"PDCM", "66": u"VCCU", "71": u"HLL_DDL2", "E9": u"EPS",
"25": u"IDM", "79": u"GPL", "E2": u"C-Display", "A7": u"PBD", "6B": u"BSW", "2D": u"ABS-VDC",
"97": u"PLC/PLGW", "DE": u"ASBMD", "31": u"Transpondeur", "63": u"SOW Left", "E6": u"SCCM",
"2A": u"ADP", "0F": u"HFCK", "EB": u"HU", "78": u"DCM", "73": u"Embrayage piloté",
"5B": u"ADAS Insulator", "5A": u"ODS_DDL2", "3F": u"Navigation", "81": u"VSP", "40": u"TSR_FRONTCAM",
"06": u"EMCU", "E1": u"CCU", "1A": u"Additional Heater", "E3": u"HMI GateWay",
"AE": u"UCBIC ISO8", "91": u"LBC (HEV) CPC", "09": u"MC HEV FSCM", "EE": u"Controlographe",
"52": u"Synthèse de la parole", "D1": u"UDM", "E7": u"SCRCM", "41": u"GATEWAY", "2C": u"Airbag",
"70": u"Lampes à décharge 84", "E4": u"IBS", "E0": u"HERMES", "7A": u"Injection",
"AB": u"Régulateur de vitesse (ISO 8)", "B0": u"Transpondeur (ISO8)", "82": u"WCGS"}
f = open("json/addressing.json", "r")
js = json.loads(f.read())
f.close()
for k, v in js.iteritems():
self.addr_group_mapping[k] = v[0]
self.addr_group_mapping_long[k] = v[1]
xmlfile = options.ecus_dir + "/eculist.xml"
jsonecu_files = glob.glob("json/*.json.targets")
for jsonecu_file in jsonecu_files:
self.numecu += 1
json_file = open(jsonecu_file, "r")
json_data = json_file.read()
json_file.close()
ecus_dict = json.loads(json_data)
for ecu_dict in ecus_dict:
href = jsonecu_file.replace(".targets", "")
name = os.path.basename(href)
# Fix typo bug
diagversion = ""
if 'diagnostic_version' in ecu_dict:
diagversion = ecu_dict['diagnostic_version']
else:
diagversion = ecu_dict['diagnotic_version']
addr = ecu_dict['address']
if 'KWP' in ecu_dict['protocol']:
if addr not in self.available_addr_kwp:
self.available_addr_kwp.append(str(addr))
elif 'CAN' in ecu_dict['protocol']:
if addr not in self.available_addr_can:
self.available_addr_can.append(str(addr))
if str(addr) not in self.addr_group_mapping:
print "Adding group ", addr, ecu_dict['group']
self.addr_group_mapping[str(addr)] = ecu_dict['group']
ecu_ident = Ecu_ident(diagversion, ecu_dict['supplier_code'],
ecu_dict['soft_version'], ecu_dict['version'],
name, ecu_dict['group'], href, ecu_dict['protocol'],
ecu_dict['projects'], addr)
for proj in ecu_dict['projects']:
projname = proj[0:3].upper()
if not projname in self.vehiclemap:
self.vehiclemap[projname] = []
self.vehiclemap[projname].append((ecu_dict['protocol'], addr))
self.targets.append(ecu_ident)
if os.path.exists("ecu.zip") and not forceXML:
zf = zipfile.ZipFile("ecu.zip", mode='r')
jsdb = zf.read("db.json")
dbdict = json.loads(jsdb)
for href, targetv in dbdict.iteritems():
self.numecu += 1
ecugroup = targetv['group']
ecuprotocol = targetv['protocol']
ecuprojects = targetv['projects']
ecuaddress = targetv['address']
ecuname = targetv['ecuname']
if 'KWP' in ecuprotocol:
if not ecuaddress in self.available_addr_kwp:
self.available_addr_kwp.append(str(ecuaddress))
elif 'CAN' in ecuprotocol:
if not ecuaddress in self.available_addr_can:
self.available_addr_can.append(str(ecuaddress))
if str(ecuaddress) not in self.addr_group_mapping:
self.addr_group_mapping[ecuaddress] = targetv['group']
if len(targetv['autoidents']) == 0:
ecu_ident = Ecu_ident("", "", "", "", ecuname, ecugroup, href, ecuprotocol,
ecuprojects, ecuaddress, True)
self.targets.append(ecu_ident)
else:
for target in targetv['autoidents']:
ecu_ident = Ecu_ident(target['diagnostic_version'], target['supplier_code'],
target['soft_version'], target['version'],
ecuname, ecugroup, href, ecuprotocol,
ecuprojects, ecuaddress, True)
self.targets.append(ecu_ident)
for proj in ecuprojects:
projname = proj[0:3].upper()
if not projname in self.vehiclemap:
self.vehiclemap[projname] = []
self.vehiclemap[projname].append((ecuprotocol, ecuaddress))
self.targets.append(ecu_ident)
if os.path.exists(xmlfile):
xdom = xml.dom.minidom.parse(xmlfile)
self.xmldoc = xdom.documentElement
if not self.xmldoc:
print "Unable to find eculist"
return
functions = self.xmldoc.getElementsByTagName("Function")
for function in functions:
targets = function.getElementsByTagName("Target")
address = function.getAttribute("Address")
address = hex(int(address))[2:].zfill(2).upper()
for target in targets:
group = target.getAttribute("group")
href = target.getAttribute("href")
name = target.getAttribute("Name")
protnode = target.getElementsByTagName("Protocol")
if protnode:
protocol = protnode[0].firstChild.nodeValue
if len(group) and (str(address) not in self.addr_group_mapping):
self.addr_group_mapping[str(address)] = group
if 'CAN' in protocol.upper():
if address not in self.available_addr_can:
self.available_addr_can.append(str(address))
elif 'KWP' in protocol.upper():
if address not in self.available_addr_kwp:
self.available_addr_kwp.append(str(address))
autoidents = target.getElementsByTagName("AutoIdents")
projectselems = target.getElementsByTagName("Projects")
projects = []
if projectselems:
for c in projectselems[0].childNodes:
projects.append(c.nodeName)
for autoident in autoidents:
self.numecu += 1
if len(autoident.getElementsByTagName("AutoIdent")) == 0:
ecu_ident = Ecu_ident("00", "??????", "0000", "0000", name, group, href, protocol,
projects, address)
self.targets.append(ecu_ident)
for ai in autoident.getElementsByTagName("AutoIdent"):
diagversion = ai.getAttribute("DiagVersion")
supplier = ai.getAttribute("Supplier")
soft = ai.getAttribute("Soft")
version = ai.getAttribute("Version")
ecu_ident = Ecu_ident(diagversion, supplier, soft, version, name, group, href, protocol,
projects, address)
self.targets.append(ecu_ident)
if projectselems:
for project in projectselems[0].childNodes:
projname = project.nodeName[0:3].upper()
if not projname in self.vehiclemap:
self.vehiclemap[projname] = []
self.vehiclemap[projname].append((ecu_ident.protocol, address))
def getTarget(self, name):
for t in self.targets:
if t.name == name:
return t
return None
def getTargets(self, name):
tgt = []
for t in self.targets:
if t.name == name:
tgt.append(t)
return tgt
def getTargetsByHref(self, href):
tgt = []
for t in self.targets:
if t.href == href:
tgt.append(t)
return tgt
def dump(self):
js = []
for t in self.targets:
if t.protocol == 'CAN' or t.protocol == 'KWP2000' or 'ISO8' == t.protocol:
js.append(t.dump())
return json.dumps(js, indent=1)
class Ecu_scanner:
def __init__(self):
self.totalecu = 0
self.ecus = {}
self.approximate_ecus = {}
self.ecu_database = Ecu_database()
self.num_ecu_found = 0
self.report_data = []
self.qapp = None
def getNumEcuDb(self):
return self.ecu_database.numecu
def getNumAddr(self):
return len(elm.dnat)
def addTarget(self, target):
self.ecus[target.name] = target
def clear(self):
self.totalecu = 0
self.ecus = {}
self.approximate_ecus = {}
self.num_ecu_found = 0
self.report_data = []
def identify_old(self, addr, label, force = False):
if not options.simulation_mode:
if not options.elm.start_session_can('10C0'):
return
if options.simulation_mode and force == False:
# Give scanner something to eat...
if addr == "04":
can_response = "61 80 30 36 32 36 52 35 37 31 31 35 32 31 36 52 01 99 00 00 00 00 02 00 00 88"
if addr == "51":
can_response = "61 80 82 00 45 15 05 08 32 31 33 21 11 31 39 09 00 09 06 02 05 01 0D 8D 39 00"
elif addr == "7A":
# Test approximate case
can_response = "61 80 82 00 44 66 27 44 32 31 33 82 00 38 71 38 00 A7 75 00 56 05 02 01 00 00"
else:
can_response = "7F 80"
else:
can_response = options.elm.request(req='2180', positive='61', cache=False)
self.check_ecu(can_response, label, addr, "CAN")
def identify_from_frame(self, addr, can_response):
self.check_ecu(can_response, None, addr, "CAN")
def identify_new(self, addr, label):
printable_chars = set(string.printable)
diagversion = ""
supplier = ""
soft_version = ''
soft = ""
can_response = ""
# Check diagversion
if not options.simulation_mode:
if not options.elm.start_session_can('1003'):
# Bad response of SDS, no need check old method (10C0)
return False
if options.simulation_mode:
# Give scanner something to eat...
if addr == '26':
can_response = "62 F1 A0 08"
elif addr == '13':
can_response = "62 F1 A0 0D"
elif addr == '26':
can_response = "62 F1 A0 08"
elif addr == '62':
can_response = "62 F1 A0 04"
elif addr == '01':
can_response = "62 F1 A0 04"
elif addr == '04':
can_response = "62 F1 A0 04"
else:
can_response = options.elm.request(req='22F1A0', positive='', cache=False)
if 'WRONG' in can_response:
return False
diagversion = can_response.replace(' ', '')[6:8]
# Check supplier ident
if options.simulation_mode:
# Give scanner something to eat...
if addr == '26':
can_response = "62 F1 8A 43 4F 4E 54 49 4E 45 4E 54 41 4C 20 41 55 54 4F 4D 4F 54 49 56 45 20 20 20 20 " \
"20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 " \
"20 20 20 20 20 20 20 20 20"
elif addr == '13':
can_response = "62 F1 8A 43 41 50"
elif addr == '26':
can_response = "62 F1 8A 43 4F 4E 54 49 4E 45 4E 54 41 4C 20 41 55 54 4F 4D 4F 54 49 56 45 20 20 20 20" \
"20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20" \
"20 20 20 20 20 20 20 20 20 FF FF"
elif addr == '62':
can_response = "62 F1 8A 41 46 4B"
elif addr == '01':
can_response = "62 F1 8A 43 41 53"
elif addr == '04':
can_response = "62 F1 8A 56 69 73 74 65 6F 6E 5F 4E 61 6D 65 73 74 6F 76 6F 5F 30 39 36 20 20 20 20"\
"20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20"\
"20 20 20 20 20 20 20 20 20 20 20 20 20"
else:
can_response = options.elm.request(req='22F18A', positive='', cache=False)
if 'WRONG' in can_response:
return False
supplier = can_response.replace(' ', '')[6:132].decode('hex')
supplier = filter(lambda x: x in printable_chars, supplier)
# Check soft number
if options.simulation_mode:
# Give scanner something to eat...
if addr == '26':
can_response = "62 F1 94 31 34 32 36 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 " \
"20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 " \
"20 20 20 20 20 20 20 20 20"
elif addr == '13':
can_response = "62 F1 94 32 32"
elif addr == '26':
can_response = "62 F1 94 31 34 32 36 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 FF FF FF FF FF FF"
elif addr == '62':
can_response = "62 F1 94 31 30 30 30 30 30 30 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 FF FF FF FF FF FF"
elif addr == '01':
can_response = "62 F1 94 4E 33 32 52 41 46 30 30 30 31 31 00 00 00 00 00 00"
elif addr == '04':
can_response = "62 F1 94 56 30 36 30 32 F1 94 56 30 36"
else:
can_response = options.elm.request(req='22F194', positive='', cache=False)
if 'WRONG' in can_response:
return False
soft = can_response.replace(' ', '')[6:38].decode('hex')
soft = filter(lambda x: x in printable_chars, soft)
# Check soft version
if options.simulation_mode:
# Give scanner something to eat...
if addr == '26':
can_response = "62 F1 95 31 30 30 30 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 " \
"20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 " \
"20 20 20 20 20 20 20 20 20"
elif addr == '13':
can_response = "62 F1 95 31 38 35 39 30 FF FF FF FF FF"
elif addr == '26':
can_response = "62 F1 95 46 30 37 2F 34 6F 00 00 00 03"
elif addr == '62':
can_response = "62 F1 95 30 35 30 31 30 30 30 32 31 37 30 30 FF FF FF FF FF"
elif addr == '01':
can_response = "62 F1 95 46 30 37 2F 34 6F 00 00 00 03"
elif addr == '04':
can_response = "62 F1 95 56 30 36 30 32 F1 95 56 30 36"
else:
can_response = options.elm.request(req='22F195', positive='', cache=False)
if 'WRONG' in can_response:
return False
# Remove unwanted non-ascii FF from string
soft_version = can_response.replace(' ', '')[6:38].decode('hex')
soft_version = filter(lambda x: x in printable_chars, soft_version)
if diagversion == "":
return False
self.check_ecu2(diagversion, supplier, soft, soft_version, label, addr, "CAN")
# New method succeded, return the good news
return True
def scan(self, progress=None, label=None, vehiclefilter=None, canline=0):
i = 0
if not options.simulation_mode:
options.elm.init_can()
project_can_addresses = []
if vehiclefilter:
if vehiclefilter in self.ecu_database.vehiclemap:
for proto, addr in self.ecu_database.vehiclemap[vehiclefilter]:
if proto == u"CAN" and not addr in project_can_addresses:
project_can_addresses.append(addr)
else:
project_can_addresses = self.ecu_database.available_addr_can
if len(project_can_addresses) == 0:
return
if progress:
progress.setRange(0, len(project_can_addresses))
progress.setValue(0)
try_new = []
# Only scan available ecu addresses
for addr in list(set(project_can_addresses)):
i += 1
if progress:
progress.setValue(i)
self.qapp.processEvents()
# Don't want to scan NON ISO-TP
if addr == '00' or addr == 'FF':
continue
if addr not in elm.dnat:
print "Warning, address %s is not mapped" % addr
continue
if len(elm.dnat[addr]) > 3:
print "Skipping CAN extended address (not supported yet) ", addr
continue
print "Scanning ECU %s" % self.ecu_database.addr_group_mapping[addr].encode('ascii', 'ignore')
if not options.simulation_mode:
options.elm.init_can()
options.elm.set_can_addr(addr, {'ecuname': 'SCAN'}, canline)
# Avoid to waste time, try new method : not working -> try old
if not self.identify_new(addr, label):
self.identify_old(addr, label)
if not options.simulation_mode:
options.elm.close_protocol()
return try_new
def scan_kwp(self, progress=None, label=None, vehiclefilter=None):
if options.simulation_mode:
# Test data..
# diagversion, supplier, soft, version, name, group, href, protocol, projects, address):
self.ecus["S2000_Atmo__SoftA3"] = Ecu_ident("004", "213", "00A5", "8300", "UCH", "GRP", "S2000_Atmo___SoftA3.json",
"KWP2000 FastInit MonoPoint", [], "7A")
if not options.simulation_mode:
options.elm.init_iso()
project_kwp_addresses = []
if vehiclefilter:
if vehiclefilter in self.ecu_database.vehiclemap:
for proto, addr in self.ecu_database.vehiclemap[vehiclefilter]:
if proto == u"KWP2000" and not addr in project_kwp_addresses:
project_kwp_addresses.append(addr)
else:
project_kwp_addresses = self.ecu_database.available_addr_kwp
if len(project_kwp_addresses) == 0:
return
i = 0
if progress:
progress.setRange(0, len(project_kwp_addresses))
progress.setValue(0)
for addr in project_kwp_addresses:
i += 1
if progress:
progress.setValue(i)
self.qapp.processEvents()
if not options.simulation_mode:
options.opt_si = True
if not options.elm.set_iso_addr(addr,
{'idTx': '', 'idRx': '',
'ecuname': 'SCAN',
'protocol': "KWP2000"}):
continue
options.elm.start_session_iso('10C0')
can_response = options.elm.request(req='2180', positive='61', cache=False)
else:
# Send some data collected during my tests
if addr == "02":
can_response = "61 80 77 00 31 38 31 04 41 42 45 E3 17 03 00 38 00 07 00 00 00 00 09 11 12 00"
elif addr == "7A":
can_response = "61 80 82 00 23 66 18 14 30 33 37 82 00 08 53 86 00 CB A4 00 70 06 3C 02 B1 A4"
elif addr == "26":
can_response = "61 80 82 00 03 27 76 00 32 31 33 11 01 10 30 08 00 66 00 00 00 41 06 01 F1 38"
else:
continue
self.check_ecu(can_response, label, addr, "KWP")
if not options.simulation_mode:
options.elm.close_protocol()
def check_ecu(self, can_response, label, addr, protocol):
if len(can_response) > 59:
diagversion = str(int(can_response[21:23], 16))
supplier = can_response[24:32].replace(' ', '').decode('hex')
soft = can_response[48:53].replace(' ', '')
version = can_response[54:59].replace(' ', '')
self.check_ecu2(diagversion, supplier, soft, version, label, addr, protocol)
def check_ecu2(self, diagversion, supplier, soft, version, label, addr, protocol):
approximate_ecu = []
found_exact = False
found_approximate = False
if addr in self.ecu_database.addr_group_mapping:
ecu_type = self.ecu_database.addr_group_mapping[addr]
else:
ecu_type = "UNKNOWN"
targetNum = 0
for target in self.ecu_database.targets:
if target.protocol == "CAN" and protocol != "CAN":
continue
if target.protocol.startswith("KWP") and protocol != "KWP":
continue
if target.checkWith(diagversion, supplier, soft, version, addr):
ecuname = "[ " + target.group + " ] " + target.name
self.ecus[ecuname] = target
self.num_ecu_found += 1
if label is not None:
label.setText("Found %i ecu" % self.num_ecu_found)
found_exact = True
href = target.href
line = "Identified ECU [%s]@%s : %s DIAGVERSION [%s]"\
"SUPPLIER [%s] SOFT [%s] VERSION [%s] {%i}"\
% (ecu_type, target.addr, href, diagversion, supplier, soft, version, targetNum)
options.main_window.logview.append(line)
break
elif target.checkApproximate(diagversion, supplier, soft, addr):
approximate_ecu.append(target)
found_approximate = True
targetNum += 1
# Try to find the closest possible version of an ECU
if not found_exact and found_approximate:
min_delta_version = 0xFFFFFF
kept_ecu = None
for tgt in approximate_ecu:
ecu_protocol = 'CAN'
if tgt.protocol.startswith("KWP"):
ecu_protocol = "KWP"
# Shouldn't happen, but...
if tgt.protocol.startswith("ISO8"):
ecu_protocol = "KWP"
if ecu_protocol != protocol:
continue
# If version contains ASCII characters, I can do nothing for you...
try:
int_version = int('0x' + version, 16)
int_tgt_version = int('0x' + tgt.version, 16)
except ValueError:
continue
delta = abs(int_tgt_version - int_version)
if delta < min_delta_version:
min_delta_version = delta
kept_ecu = tgt
if kept_ecu:
self.approximate_ecus[kept_ecu.name] = kept_ecu
self.num_ecu_found += 1
if label is not None:
label.setText("Found %i ecu" % self.num_ecu_found)
line = "Found ECU [%s] (not perfect match) :"\
"%s DIAGVERSION [%s] SUPPLIER [%s] SOFT [%s] VERSION [%s instead %s]"\
% (ecu_type, kept_ecu.name, diagversion, supplier, soft, version, tgt.version)
options.main_window.logview.append(line)
if not found_exact and not found_approximate:
line = "Found ECU [%s] (no relevant ECU file found) :" \
"DIAGVERSION [%s] SUPPLIER [%s] SOFT [%s] VERSION [%s]" \
% (ecu_type, diagversion, supplier, soft, version)
options.main_window.logview.append(line)
def make_zipfs():
options.ecus_dir = "./ecus"
zipoutput = StringIO()
i = 0
ecus = glob.glob("ecus/*.xml")
ecus.remove("ecus/eculist.xml")
with zipfile.ZipFile(zipoutput, mode='w', compression=zipfile.ZIP_DEFLATED, allowZip64=True) as zf:
print("Writing vehicles database...")
db = Ecu_database(True)
zf.writestr("db.json", str(db.dump()))
for target in ecus:
name = target
print "Starting zipping " + target + " " + str(i) + "/" + str(len(ecus))
fileout = name.replace('.xml', '.json')
ecur = Ecu_file(name, True)
zf.writestr(fileout, ecur.dumpJson())
i += 1
# if i == 15:
# break
with open("json/ecus.zip", "w") as f:
f.write(zipoutput.getvalue())
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--zipfs', action="store_true", default=None, help="Create a zip filesystem of the XMLs")
parser.add_argument('--testdb', action="store_true", default=None, help="Test ecudatabse loading")
parser.add_argument('--testecufile', action="store_true", default=None, help="Test ecudatabse loading")
parser.add_argument('--convertxml', action="store_true", default=None, help="Convert XML file to JSON")
args = parser.parse_args()
if args.zipfs:
make_zipfs()
if args.testdb:
db = Ecu_database()
if args.convertxml:
db = Ecu_database()
if args.testecufile:
db = Ecu_file("ecus/Sim32_RD3CA0_W44_J77_X85.json", True)