Skip to content

Commit

Permalink
paralle write offset chunks to sequential
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed Jan 19, 2024
1 parent 6e7c8ea commit b5edc6f
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ class EdgeWriter(
var chunkIndex: Int = 0
val fileType = edgeInfo.getAdjListFileType(adjListType)
val outputPrefix = prefix + edgeInfo.getOffsetPathPrefix(adjListType)
val offsetChunks = edgeDfAndOffsetDf._2
// parallel write offset chunks case error, convert to sequential
val offsetChunks = edgeDfAndOffsetDf._2.seq
offsetChunks.foreach { case (i, offsetChunk) =>
FileSystem.writeDataFrame(
offsetChunk,
Expand Down Expand Up @@ -338,11 +339,11 @@ class EdgeWriter(
val property = pIter.next()
propertyList += "`" + property.getName() + "`"
}
val propetyGroupDf = edgeDfAndOffsetDf._1.select(propertyList.map(col): _*)
val propertyGroupDf = edgeDfAndOffsetDf._1.select(propertyList.map(col): _*)
val outputPrefix =
prefix + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType)
FileSystem.writeDataFrame(
propetyGroupDf,
propertyGroupDf,
propertyGroup.getFile_type(),
outputPrefix,
None,
Expand Down

0 comments on commit b5edc6f

Please sign in to comment.