Skip to content

Commit

Permalink
Merge pull request #20 from Riscure/experiments/chipwhisperer_converter
Browse files Browse the repository at this point in the history
Experiments/chipwhisperer converter
  • Loading branch information
TomHogervorst authored Oct 13, 2022
2 parents 1ac882d + a718ae0 commit d27cf28
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 174 deletions.
Empty file added riscure/__init__.py
Empty file.
93 changes: 93 additions & 0 deletions riscure/chipwhisperer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# To use this converter, please install the 'chipwhisperer' and 'scipy' packages from PIPy
import os
import re
from os.path import dirname
from typing import Any

from chipwhisperer.common.api.ProjectFormat import Project

from trsfile import TraceSet, Header, TracePadding, Trace, SampleCoding
from trsfile.parametermap import TraceSetParameterMap, TraceParameterDefinitionMap, TraceParameterMap
from trsfile.traceparameter import TraceParameterDefinition, ParameterType, StringParameter, ByteArrayParameter


def to_trs(path_to_project: str, output_path: str, trace_index: int = 0):
project = Project()
project.load(path_to_project)
container = project.trace_manager().get_segment(trace_index)
traceset_parameters = TraceSetParameterMap()
headers = {
Header.DESCRIPTION: read_or_default(container.config, 'notes'),
Header.ACQUISITION_DEVICE_ID: read_or_default(container.config, 'scopeName'),
Header.SCALE_X: 1 / float(read_or_default(container.config, 'scopeSampleRate', 1)),
Header.TRACE_SET_PARAMETERS: traceset_parameters
}
container.loadAllTraces()
contains_textin = container.textins is not None and len(container.textins) > 0 and container.textins[0] is not None
contains_keylist = container.keylist is not None and len(container.keylist) > 0 and container.keylist[0] is not None
contains_textout = container.textouts is not None and len(container.textouts) > 0 and container.textouts[0] is not None

traceset_parameters['TARGET_SW'] = StringParameter(read_or_default(container.config, 'targetSW'))
traceset_parameters['TARGET_HW'] = StringParameter(read_or_default(container.config, 'targetHW'))
traceset_parameters['DATE'] = StringParameter(read_or_default(container.config, 'date'))

# Try to read settings file and add additional parameters
try:
trace_folder = dirname(os.path.abspath(container.config.config.filename))
extra_parameters = CWSettings.read(
os.path.join(trace_folder, container.config.attr('prefix') + 'settings.cwset'))
traceset_parameters.update(extra_parameters)
except:
print('Warning: Failed to read additional settings. Trace reading will continue without additional settings.')

with TraceSet(path=output_path,
mode='w',
headers=headers,
padding_mode=TracePadding.AUTO) as trace_set:
for n in range(0, len(container.traces)):
trace_parameters = TraceParameterMap()
if contains_textin:
trace_parameters['INPUT'] = ByteArrayParameter(container.getTextin(n))
if contains_textout:
trace_parameters['OUTPUT'] = ByteArrayParameter(container.getTextout(n))
if contains_keylist:
trace_parameters['KEY'] = ByteArrayParameter(container.getKnownKey(n))
trace_set.append(Trace(SampleCoding.FLOAT, container.getTrace(n), trace_parameters))


def read_or_default(config, attr: str, default: Any = ''):
try:
return config.attr(attr)
except:
return default


class CWSettings:
@staticmethod
def read(path: str) -> TraceSetParameterMap:
parameters = TraceSetParameterMap()
with open(path) as settings_file:
lines = settings_file.readlines()
category = ''
for line in lines:
line = line.strip()
if line.startswith('['):
category = CWSettings.get_category(line, category)
else:
matcher = re.search('(.*) = (.*)', line)
if matcher:
parameters[category + ":" + matcher.group(1)] = StringParameter(matcher.group(2))
return parameters

@staticmethod
def get_category(line: str, category: str, level: int = 0) -> str:
sub_category = line.replace('[', '').replace(']', '')
if line.startswith('['):
return CWSettings.get_category(line[1:], category, level + 1)
else:
if len(category.split(':')) >= level:
category = ':'.join(category.split(':')[0:level - 1])
if category == '':
return sub_category
else:
return category + ':' + sub_category
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ project_urls =
[options]
packages =
trsfile
riscure
install_requires =
numpy
include_package_data = True
Expand Down
238 changes: 120 additions & 118 deletions trsfile/trace_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,166 +6,168 @@
from trsfile.engine.file import FileEngine

engines = {
'trsengine': TrsEngine,
'fileengine': FileEngine,
'trsengine': TrsEngine,
'fileengine': FileEngine,
}
ENGINE = 'engine'


class TraceSet:
"""The :py:obj:`TraceSet` class behaves like a :py:obj:`list`
object were each item in the list is a :py:obj:`Trace`.
"""The :py:obj:`TraceSet` class behaves like a :py:obj:`list`
object were each item in the list is a :py:obj:`Trace`.
Storing the :py:obj:`TraceSet` requires knowledge on the format which is
resolved through the usage of storage engines (:py:obj:`Engine`).
"""
Storing the :py:obj:`TraceSet` requires knowledge on the format which is
resolved through the usage of storage engines (:py:obj:`Engine`).
"""

def __init__(self, path, mode = 'r', **options):
# Defaults
self.engine = None
def __init__(self, path, mode='r', **options):
# Defaults
self.engine = None

# Get the storage engine if one is given, else default to TrsEngine
engine = options.get(ENGINE, TrsEngine)
if ENGINE in options:
del options[ENGINE]
# Get the storage engine if one is given, else default to TrsEngine
engine = options.get(ENGINE, TrsEngine)
if ENGINE in options:
del options[ENGINE]

# We also support engine to be passed as string
if isinstance(engine, str):
engine_name = engine.lower()
if not engine_name.endswith(ENGINE):
engine_name += ENGINE
if engine_name not in engines:
raise ValueError('The storage engine \'{0:s}\'does not exists'.format(engine_name))
engine = engines[engine_name]
# We also support engine to be passed as string
if isinstance(engine, str):
engine_name = engine.lower()
if not engine_name.endswith(ENGINE):
engine_name += ENGINE
if engine_name not in engines:
raise ValueError('The storage engine \'{0:s}\'does not exists'.format(engine_name))
engine = engines[engine_name]

# Check type
if not issubclass(engine, Engine):
raise TypeError('The storage engine has to be of type \'Engine\'')
# Check type
if not issubclass(engine, Engine):
raise TypeError('The storage engine has to be of type \'Engine\'')

self.engine = engine(path, mode, **options)
self.engine = engine(path, mode, **options)

def __del__(self):
self.close()
def __del__(self):
self.close()

def __iter__(self):
""" reset pointer """
self.iterator_index = -1
return self
def __iter__(self):
""" reset pointer """
self.iterator_index = -1
return self

def __next__(self):
self.iterator_index = self.iterator_index + 1
def __next__(self):
self.iterator_index = self.iterator_index + 1

if self.iterator_index >= len(self):
raise StopIteration
return self[self.iterator_index]
if self.iterator_index >= len(self):
raise StopIteration
return self[self.iterator_index]

def __enter__(self):
"""Called when entering a `with` block"""
if self.engine.is_closed():
raise ValueError('I/O operation on closed trace set')
return self
def __enter__(self):
"""Called when entering a `with` block"""
if self.engine.is_closed():
raise ValueError('I/O operation on closed trace set')
return self

def __exit__(self, *args):
"""Called when exiting a `with` block"""
self.close()
def __exit__(self, *args):
"""Called when exiting a `with` block"""
self.close()

def __repr__(self):
if len(self) <= 0:
return '<TraceSet (0), empty>'
else:
return '<TraceSet ({0:d}), {1:s}, ... ,{2:s}>'.format(len(self), repr(self[0]), repr(self[-1]))
def __repr__(self):
if len(self) <= 0:
return '<TraceSet (0), empty>'
else:
return '<TraceSet ({0:d}), {1:s}, ... ,{2:s}>'.format(len(self), repr(self[0]), repr(self[-1]))

def __len__(self):
if self.engine.is_closed():
raise ValueError('I/O operation on closed trace set')
def __len__(self):
if self.engine.is_closed():
raise ValueError('I/O operation on closed trace set')

return self.engine.length()
return self.engine.length()

def __delitem__(self, index):
if self.engine.is_closed():
raise ValueError('I/O operation on closed trace set')
def __delitem__(self, index):
if self.engine.is_closed():
raise ValueError('I/O operation on closed trace set')

if self.engine.is_read_only():
raise TypeError('Cannot modify trace set, it is (opened) read-only')
if self.engine.is_read_only():
raise TypeError('Cannot modify trace set, it is (opened) read-only')

return self.engine.del_traces(index)
return self.engine.del_traces(index)

def __setitem__(self, index, traces):
if self.engine.is_closed():
raise ValueError('I/O operation on closed trace set')
def __setitem__(self, index, traces):
if self.engine.is_closed():
raise ValueError('I/O operation on closed trace set')

if self.engine.is_read_only():
raise TypeError('Cannot modify trace set, it is (opened) read-only')
if self.engine.is_read_only():
raise TypeError('Cannot modify trace set, it is (opened) read-only')

# Make sure we have iterable traces
if isinstance(traces, Trace):
traces = [traces]
# Make sure we have iterable traces
if isinstance(traces, Trace):
traces = [traces]

# Make sure we only are setting traces
if any(not isinstance(trace, Trace) for trace in traces):
raise TypeError('All objects assigned to a trace set needs to be of type \'Trace\'')
# Make sure we only are setting traces
if any(not isinstance(trace, Trace) for trace in traces):
raise TypeError('All objects assigned to a trace set needs to be of type \'Trace\'')

return self.engine.set_traces(index, traces)
return self.engine.set_traces(index, traces)

def __getitem__(self, index):
if self.engine.is_closed():
raise ValueError('I/O operation on closed trace set')
def __getitem__(self, index):
if self.engine.is_closed():
raise ValueError('I/O operation on closed trace set')

traces = self.engine.get_traces(index)
traces = self.engine.get_traces(index)

# Return the select item(s)
if isinstance(index, slice):
return traces
else:
# Earlier logic should ensure traces contains one element!
return traces[0]
# Return the select item(s)
if isinstance(index, slice):
return traces
else:
# Earlier logic should ensure traces contains one element!
return traces[0]

def is_closed(self):
return self.engine.is_closed()
def is_closed(self):
return self.engine.is_closed()

def close(self):
if self.engine is not None:
self.engine.close()
def close(self):
if self.engine is not None:
self.engine.close()

def append(self, trace):
self[len(self):len(self)] = trace
def append(self, trace):
self[len(self):len(self)] = trace

def extend(self, traces):
self[len(self):len(self)] = traces
def extend(self, traces):
self[len(self):len(self)] = traces

def insert(self, index, trace):
# TODO: Not yet implemented nicely
#self[index:index] = trace
raise NotImplementedError('Insert has not yet been implemented for TraceSet, please raise a Github ticket for priority!')
def insert(self, index, trace):
# TODO: Not yet implemented nicely
# self[index:index] = trace
raise NotImplementedError(
'Insert has not yet been implemented for TraceSet, please raise a Github ticket for priority!')

def reverse(self):
return self[::-1]
def reverse(self):
return self[::-1]

def update_headers(self, headers):
if self.engine.is_closed():
raise ValueError('I/O operation on closed trace set')
def update_headers(self, headers):
if self.engine.is_closed():
raise ValueError('I/O operation on closed trace set')

return self.engine.update_headers(headers)
return self.engine.update_headers(headers)

def update_header(self, header, value):
return self.update_headers({header: value})
def update_header(self, header, value):
return self.update_headers({header: value})

def get_headers(self):
return self.engine.headers
def get_headers(self):
return self.engine.headers

def get_header(self, header):
return self.engine.headers[header]
def get_header(self, header):
return self.engine.headers[header]

def __eq__(self, other):
"""Compares two trace sets to each other"""
if not isinstance(other, TrsFile):
return False
def __eq__(self, other):
"""Compares two trace sets to each other"""
if not isinstance(other, TraceSet):
return False

if len(self) != len(other):
return False
if len(self) != len(other):
return False

# Not using any, because we want to stop as soon as a difference arises
for self_trace, other_trace in zip(self, other):
if self_trace != other_trace:
return False
# Not using any, because we want to stop as soon as a difference arises
for self_trace, other_trace in zip(self, other):
if self_trace != other_trace:
return False

return True
return True
Loading

0 comments on commit d27cf28

Please sign in to comment.