Skip to content

Commit

Permalink
mdf4reader : added multiprocessing to channel conversion. quicker for…
Browse files Browse the repository at this point in the history
… complex conversion like tables but slower in case of simple conversion
  • Loading branch information
aymeric.rateau@gmail.com committed Feb 12, 2015
1 parent 658ba8a commit c5c7389
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 21 deletions.
45 changes: 29 additions & 16 deletions mdf4reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from io import open # for python 3 and 2 consistency
from mdfinfo4 import info4, MDFBlock, ATBlock#, CNBlock
from time import gmtime, strftime
from multiprocessing import Pool,cpu_count
from multiprocessing import Queue, Process
PythonVersion=version_info
PythonVersion=PythonVersion[0]

Expand Down Expand Up @@ -985,14 +985,16 @@ def read4( self, fileName=None, info = None, multiProc = False, channelList=None
L=buf['data']
else:
print('No data in dataGroup '+ str(dataGroup))

if self.multiProc and 'data' in buf:
del buf

if self.multiProc:
for p in proc:
L.update(Q.get()) # concatenate results of processes in dict
for p in proc:
p.join()
del Q # free memory

fid.close() # close file
# After all processing of channels,
# prepare final class data with all its keys
for dataGroup in list(info['DGBlock'].keys()):
Expand Down Expand Up @@ -1065,7 +1067,7 @@ def read4( self, fileName=None, info = None, multiProc = False, channelList=None
if masterDataGroup: #master channel exist
self.masterChannelList[masterDataGroup[dataGroup]].append(name)

fid.close() # close file


if convertAfterRead:
self.convertAllChannel4()
Expand All @@ -1089,7 +1091,7 @@ def getChannelData4(self, channelName):
This method is the safest to get channel data as numpy array from 'data' dict key might contain raw data
"""
if channelName in self:
return convertChannelData4(self[channelName], self.convert_tables)
return convertChannelData4(self[channelName], channelName, self.convert_tables)[channelName]
else:
raise('Channel not in dictionary')

Expand All @@ -1101,7 +1103,7 @@ def convertChannel4(self, channelName):
channelName : str
Name of channel
"""
self[channelName]['data'] = convertChannelData4(self[channelName], self.convert_tables)
self[channelName]['data'] = convertChannelData4(self[channelName], channelName, self.convert_tables)[channelName]
if 'conversion' in self[channelName]:
self[channelName].pop('conversion')

Expand All @@ -1112,16 +1114,22 @@ def convertAllChannel4(self):
if self.multiProc == False:
[self.convertChannel4(channelName) for channelName in self]
else: # multiprocessing
ncpu=cpu_count() # to still have response from PC
if ncpu<1:
ncpu=1
pool = Pool(processes=ncpu)
args = [(self[channelName]['data'], self.convert_tables) for channelName in self]
result = pool.apply_async(convertChannelData4,args)
result.get()
proc = []
Q=Queue()
L={}
for channelName in self:
proc.append( Process(target=convertChannelData4,args=(self[channelName], channelName, self.convert_tables, True, Q)))
proc[-1].start()
for p in proc:
L.update(Q.get()) # concatenate results of processes in dict
for p in proc:
p.join()
del Q # free memory
index=0
for channelName in self:
self[channelName]['data']=result[index]
self[channelName]['data']=L[channelName]
if 'conversion' in self[channelName]:
self[channelName].pop('conversion')
index+=1

def convertName(channelName):
Expand Down Expand Up @@ -1360,7 +1368,7 @@ def processDataBlocks4( Q, buf, info, dataGroup, channelList, multiProc ):
else:
return L

def convertChannelData4(channel, convert_tables=False):
def convertChannelData4(channel, channelName, convert_tables, multiProc=False, Q=None):
"""converts specific channel from raw to physical data according to CCBlock information
Parameters
Expand Down Expand Up @@ -1399,7 +1407,12 @@ def convertChannelData4(channel, convert_tables=False):
vect = textToValueConv(vect, channel['conversion']['parameters']['cc_val'], channel['conversion']['parameters']['cc_ref'])
elif conversion_type == 10 and text_type and convert_tables:
vect = textToTextConv(vect, channel['conversion']['parameters']['cc_ref'])
return vect
L={}
L[channelName]=vect
if multiProc:
Q.put(L)
else:
return L

def linearConv(vect, cc_val):
""" apply linear conversion to data
Expand Down
15 changes: 10 additions & 5 deletions mdfreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,11 +649,16 @@ def setAttribute(f, name, value):
f = netcdf.netcdf_file( filename, 'w' )
setAttribute( f, 'Date', (self.date))
setAttribute( f, 'Time', (self.time))
setAttribute(f, 'Author', self.author)
setAttribute( f, 'Organization', (self.organisation))
setAttribute( f, 'ProjectName', (self.project))
setAttribute( f, 'Subject', (self.subject))
setAttribute( f, 'Comment', (self.comment))
if self.author is not None:
setAttribute(f, 'Author', self.author)
if self.organisation is not None:
setAttribute( f, 'Organization', (self.organisation))
if self.project is not None:
setAttribute( f, 'ProjectName', (self.project))
if self.subject is not None:
setAttribute( f, 'Subject', (self.subject))
if self.comment is not None:
setAttribute( f, 'Comment', (self.comment))
# Create dimensions having name of all time channels
for time in list(self.masterChannelList.keys()):
f.createDimension( time, len( self.getChannelData(time) ) )
Expand Down

0 comments on commit c5c7389

Please sign in to comment.