Skip to content

Commit

Permalink
Changed from shared memory list (multiprocessing.manager) to simple Q…
Browse files Browse the repository at this point in the history
…ueue to exchange data between processes -> more quick/efficient and hopefully compatible with Windows environment
  • Loading branch information
aymeric.rateau@gmail.com committed Sep 13, 2011
1 parent ced2aac commit f8efa4f
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions mdfreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@
import sys
#import time

def processDataBlocks( L, buf, info, numberOfRecords, dataGroup ):
def processDataBlocks( Q, buf, info, numberOfRecords, dataGroup ):
## Processes recorded data blocks
# Outside of class to allow multiprocessing
numberOfRecordIDs = info.DGBlock[dataGroup]['numberOfRecordIDs']
#procTime = time.clock()
#print( 'Start process ' + str( dataGroup ) + ' Number of Channels ' + str( info.CGBlock[dataGroup][0]['numberOfChannels'] ) + ' of length ' + str( len( buf ) ) + ' ' + str( time.clock() ) )

L={}
isBitInUnit8 = 0 # Initialize Bit counter
## Processes Bits, metadata and conversion
for channelGroup in range( info.DGBlock[dataGroup]['numberOfChannelGroups'] ):
Expand Down Expand Up @@ -168,7 +168,7 @@ def processDataBlocks( L, buf, info, numberOfRecords, dataGroup ):
print( 'Conversion of formula : ' + info.CCBlock[dataGroup][channelGroup][channel]['conversion']['textFormula'] + 'not yet supported' )
elif conversionFormulaIdentifier == 12: # Text Range Table
pass # Not yet supported, practically not used format

Q.put(L)
#print( 'Process ' + str( dataGroup ) + ' Finished ' + str( time.clock() - procTime ) )

class superDict( dict ):
Expand Down Expand Up @@ -683,17 +683,16 @@ def read( self, fileName = None, multiProc = True, channelList=None):
# Open file
fid = open( self.fileName, 'rb', buffering = 65536 )

# prepare multiprocessing of dataGroups
proc = []
manager = multiprocessing.Manager()
L = manager.dict()

# Look for the biggest group to process first, to reduce processing time
dataGroupList = dict.fromkeys( range( info.HDBlock['numberOfDataGroups'] ) )
for dataGroup in dataGroupList.keys():
dataGroupList[dataGroup] = info.CGBlock[dataGroup][0]['numberOfRecords']
sortedDataGroup = sorted( dataGroupList, key = dataGroupList.__getitem__, reverse = True )

# prepare multiprocessing of dataGroups
proc = []
Q=multiprocessing.Queue()

## Defines record format
for dataGroup in sortedDataGroup:
# Number for records before and after the data records
Expand Down Expand Up @@ -744,19 +743,21 @@ def read( self, fileName = None, multiProc = True, channelList=None):
#print( 'Group ' + str( dataGroup ) + ' ' + str( time.clock() - inttime ) )
## reads the records
buf = numpy.core.records.fromfile( fid, dtype = numpyDataRecordFormat, shape = numberOfRecords )

if self.multiProc:
proc.append( multiprocessing.Process( target = processDataBlocks,
args = ( L, buf, info, numberOfRecords, dataGroup ) ) )
args = ( Q, buf, info, numberOfRecords, dataGroup ) ) )
proc[-1].start()
else: # for debugging purpose, can switch off multiprocessing
processDataBlocks( L, buf, info, numberOfRecords, dataGroup )
processDataBlocks( Q, buf, info, numberOfRecords, dataGroup )

fid.close() # close file

if self.multiProc:
for i in range ( len( proc ) ): # Make sure all processes are finished
proc[i].join( 120 ) # waits for 2 minutes, should be enough ?
L={}
dataGroupsProcessed=1
while dataGroupsProcessed<len(dataGroupList):
L.update(Q.get()) # concatenate results of processes in dict
dataGroupsProcessed+=1

# After all processing of channels,
# prepare final class data with all its keys
Expand Down

0 comments on commit f8efa4f

Please sign in to comment.