diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala index 5667b29e8..b391251ab 100644 --- a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.types.{ } import org.apache.spark.sql.functions._ +import scala.collection.parallel.immutable.ParSeq import scala.collection.SortedMap import scala.collection.mutable.ArrayBuffer @@ -47,7 +48,7 @@ object EdgeWriter { edgeInfo: EdgeInfo, adjListType: AdjListType.Value, vertexNumOfPrimaryVertexLabel: Long - ): (DataFrame, Seq[DataFrame], Array[Long], Map[Long, Int]) = { + ): (DataFrame, ParSeq[(Int, DataFrame)], Array[Long], Map[Long, Int]) = { val edgeSchema = edgeDf.schema val colName = if ( adjListType == AdjListType.ordered_by_source || adjListType == AdjListType.unordered_by_source @@ -150,34 +151,35 @@ object EdgeWriter { val offsetDfSchema = StructType( Seq(StructField(GeneralParams.offsetCol, IntegerType)) ) - val offsetDfArray: Seq[DataFrame] = (0 until vertexChunkNum).map { i => - { - val filterRDD = edgeCountsByPrimaryKey - .filter(v => v._1 / vertexChunkSize == i) - .map { case (k, v) => (k - i * vertexChunkSize + 1, v) } - val initRDD = spark.sparkContext.parallelize( - (0L to vertexChunkSize).map(key => (key, 0)) - ) - val unionRDD = spark.sparkContext - .union(filterRDD, initRDD) - .reduceByKey(_ + _) - .sortByKey(numPartitions = 1) - val offsetRDD = unionRDD - .mapPartitionsWithIndex((i, ps) => { - var sum = 0 - var preSum = 0 - for ((k, count) <- ps) yield { - preSum = sum - sum = sum + count - (k, count + preSum) - } - }) - .map { case (k, v) => Row(v) } - val offsetChunk = spark.createDataFrame(offsetRDD, offsetDfSchema) - offsetChunk.persist(GeneralParams.defaultStorageLevel) - offsetChunk + val offsetDfArray: ParSeq[(Int, DataFrame)] = + (0 until vertexChunkNum).par.map { i => + { + val filterRDD = edgeCountsByPrimaryKey + .filter(v => v._1 / vertexChunkSize == i) + .map { case (k, v) => (k - i * vertexChunkSize + 1, v) } + val initRDD = spark.sparkContext.parallelize( + (0L to vertexChunkSize).map(key => (key, 0)) + ) + val unionRDD = spark.sparkContext + .union(filterRDD, initRDD) + .reduceByKey(_ + _) + .sortByKey(numPartitions = 1) + val offsetRDD = unionRDD + .mapPartitionsWithIndex((i, ps) => { + var sum = 0 + var preSum = 0 + for ((k, count) <- ps) yield { + preSum = sum + sum = sum + count + (k, count + preSum) + } + }) + .map { case (k, v) => Row(v) } + val offsetChunk = spark.createDataFrame(offsetRDD, offsetDfSchema) + offsetChunk.persist(GeneralParams.defaultStorageLevel) + (i, offsetChunk) + } } - } edgeCountsByPrimaryKey.unpersist() // unpersist the edgeCountsByPrimaryKey return ( partitionEdgeDf, @@ -186,7 +188,7 @@ object EdgeWriter { edgeNumMutableMap.toMap ) } - val offsetDfArray = Seq.empty[DataFrame] + val offsetDfArray = ParSeq.empty[(Int, DataFrame)] return ( partitionEdgeDf, offsetDfArray, @@ -258,7 +260,7 @@ class EdgeWriter( } private val edgeDfAndOffsetDf - : (DataFrame, Seq[DataFrame], Array[Long], Map[Long, Int]) = + : (DataFrame, ParSeq[(Int, DataFrame)], Array[Long], Map[Long, Int]) = EdgeWriter.repartitionAndSort( spark, edgeDf, @@ -275,9 +277,10 @@ class EdgeWriter( else edgeInfo.getDst_chunk_size() val vertexChunkNum: Int = ((vertexNum + vertexChunkSize - 1) / vertexChunkSize).toInt - for (i <- 0 until vertexChunkNum) { - val edgeNumber = edgeDfAndOffsetDf._4(i) - val outputPath = prefix + edgeInfo.getEdgesNumFilePath(i, adjListType) + val parallelEdgeNums = edgeDfAndOffsetDf._4.par + parallelEdgeNums.foreach { case (chunkIndex, edgeNumber) => + val outputPath = + prefix + edgeInfo.getEdgesNumFilePath(chunkIndex, adjListType) FileSystem.writeValue( edgeNumber, outputPath, @@ -291,16 +294,17 @@ class EdgeWriter( var chunkIndex: Int = 0 val fileType = edgeInfo.getAdjListFileType(adjListType) val outputPrefix = prefix + edgeInfo.getOffsetPathPrefix(adjListType) - for (offsetChunk <- edgeDfAndOffsetDf._2) { + // TODO(@acezen): Support parallel write with GarDataSource + val offsetChunks = edgeDfAndOffsetDf._2.seq + offsetChunks.foreach { case (i, offsetChunk) => FileSystem.writeDataFrame( offsetChunk, FileType.FileTypeToString(fileType), outputPrefix, - Some(chunkIndex), + Some(i), None ) offsetChunk.unpersist() - chunkIndex = chunkIndex + 1 } } @@ -346,11 +350,11 @@ class EdgeWriter( val property = pIter.next() propertyList += "`" + property.getName() + "`" } - val propetyGroupDf = edgeDfAndOffsetDf._1.select(propertyList.map(col): _*) + val propertyGroupDf = edgeDfAndOffsetDf._1.select(propertyList.map(col): _*) val outputPrefix = prefix + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType) FileSystem.writeDataFrame( - propetyGroupDf, + propertyGroupDf, propertyGroup.getFile_type(), outputPrefix, None,