Skip to content

Commit

Permalink
[CELEBORN-1446] Enable chunk prefetch when initialize CelebornInputSt…
Browse files Browse the repository at this point in the history
…ream

### What changes were proposed in this pull request?
apache#2348 avoids fetching first chunk in the constructor
of `CelebornInputStreamImpl`, but in some cases, i.e. coalescing 3000 partitions into one in Spark,
it can be beneficial to do so for performance. This PR adds back prefetching with knobs default to false.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?
Yes, two configs are added.

### How was this patch tested?
Extended `MemorySkewJoinSuite` and `ReusedExchangeSuite`, and manual test.

Closes apache#2549 from waitinfuture/1446.

Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
  • Loading branch information
waitinfuture authored and RexXiong committed Jul 18, 2024
1 parent 0dc84a5 commit d692e49
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ class CelebornShuffleReader[K, C](
val end = System.currentTimeMillis()
logInfo(s"BatchOpenStream for $partCnt cost ${end - startTime}ms")

def createInputStream(partitionId: Int): CelebornInputStream = {
val streams = new ConcurrentHashMap[Integer, CelebornInputStream]()

def createInputStream(partitionId: Int): Unit = {
val locations =
if (fileGroups.partitionGroups.containsKey(partitionId)) {
new util.ArrayList(fileGroups.partitionGroups.get(partitionId))
Expand All @@ -203,7 +205,7 @@ class CelebornShuffleReader[K, C](
} else null
if (exceptionRef.get() == null) {
try {
shuffleClient.readPartition(
val inputStream = shuffleClient.readPartition(
shuffleId,
handle.shuffleId,
partitionId,
Expand All @@ -215,45 +217,70 @@ class CelebornShuffleReader[K, C](
streamHandlers,
fileGroups.mapAttempts,
metricsCallback)
streams.put(partitionId, inputStream)
} catch {
case e: IOException =>
logError(s"Exception caught when readPartition $partitionId!", e)
exceptionRef.compareAndSet(null, e)
null
case e: Throwable =>
logError(s"Non IOException caught when readPartition $partitionId!", e)
exceptionRef.compareAndSet(null, new CelebornIOException(e))
null
}
} else null
}
}

val inputStreamCreationWindow = conf.clientInputStreamCreationWindow
(startPartition until Math.min(
startPartition + inputStreamCreationWindow,
endPartition)).foreach(partitionId => {
streamCreatorPool.submit(new Runnable {
override def run(): Unit = {
createInputStream(partitionId)
}
})
})

val recordIter = (startPartition until endPartition).iterator.map(partitionId => {
if (handle.numMappers > 0) {
val startFetchWait = System.nanoTime()
val inputStream: CelebornInputStream = createInputStream(partitionId)
if (exceptionRef.get() != null) {
exceptionRef.get() match {
case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
if (throwsFetchFailure &&
shuffleClient.reportShuffleFetchFailure(handle.shuffleId, shuffleId)) {
throw new FetchFailedException(
null,
handle.shuffleId,
-1,
-1,
partitionId,
SparkUtils.FETCH_FAILURE_ERROR_MSG + handle.shuffleId + "/" + shuffleId,
ce)
} else
throw ce
case e => throw e
var inputStream: CelebornInputStream = streams.get(partitionId)
while (inputStream == null) {
if (exceptionRef.get() != null) {
exceptionRef.get() match {
case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
if (throwsFetchFailure &&
shuffleClient.reportShuffleFetchFailure(handle.shuffleId, shuffleId)) {
throw new FetchFailedException(
null,
handle.shuffleId,
-1,
-1,
partitionId,
SparkUtils.FETCH_FAILURE_ERROR_MSG + handle.shuffleId + "/" + shuffleId,
ce)
} else
throw ce
case e => throw e
}
}
logInfo("inputStream is null, sleeping...")
Thread.sleep(50)
inputStream = streams.get(partitionId)
}
metricsCallback.incReadTime(
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait))
// ensure inputStream is closed when task completes
context.addTaskCompletionListener[Unit](_ => inputStream.close())

// Advance the input creation window
if (partitionId + inputStreamCreationWindow < endPartition) {
streamCreatorPool.submit(new Runnable {
override def run(): Unit = {
createInputStream(partitionId + inputStreamCreationWindow)
}
})
}

(partitionId, inputStream)
} else {
(partitionId, CelebornInputStream.empty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,12 @@ private static final class CelebornInputStreamImpl extends CelebornInputStream {
this.shuffleId = shuffleId;
this.shuffleClient = shuffleClient;

moveToNextReader(false);
boolean chunkPrefetchEnabled = conf.clientChunkPrefetchEnabled();
moveToNextReader(chunkPrefetchEnabled);
if (chunkPrefetchEnabled) {
init();
firstChunk = false;
}
}

private boolean skipLocation(int startMapIndex, int endMapIndex, PartitionLocation location) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
get(CLIENT_RESULT_PARTITION_SUPPORT_FLOATING_BUFFER)
def clientFlinkDataCompressionEnabled: Boolean = get(CLIENT_DATA_COMPRESSION_ENABLED)
def clientShuffleMapPartitionSplitEnabled = get(CLIENT_SHUFFLE_MAPPARTITION_SPLIT_ENABLED)
def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)

// //////////////////////////////////////////////////////
// kerberos //
Expand Down Expand Up @@ -5101,6 +5103,23 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)

val CLIENT_CHUNK_PREFETCH_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.chunk.prefetch.enabled")
.categories("client")
.doc("Whether to enable chunk prefetch when creating CelebornInputStream.")
.version("0.6.0")
.booleanConf
.createWithDefault(false)

val CLIENT_INPUTSTREAM_CREATION_WINDOW: ConfigEntry[Int] =
buildConf("celeborn.client.inputStream.creation.window")
.categories("client")
.doc(s"Window size that CelebornShuffleReader pre-creates CelebornInputStreams, for coalesced scenario" +
s"where multiple Partitions are read")
.version("0.6.0")
.intConf
.createWithDefault(16)

val MAX_DEFAULT_NETTY_THREADS: ConfigEntry[Int] =
buildConf("celeborn.io.maxDefaultNettyThreads")
.categories("network")
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ license: |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.client.application.heartbeatInterval | 10s | false | Interval for client to send heartbeat message to master. | 0.3.0 | celeborn.application.heartbeatInterval |
| celeborn.client.application.unregister.enabled | true | false | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 | |
| celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable chunk prefetch when creating CelebornInputStream. | 0.6.0 | |
| celeborn.client.closeIdleConnections | true | false | Whether client will close idle connections. | 0.3.0 | |
| celeborn.client.commitFiles.ignoreExcludedWorker | false | false | When true, LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | |
| celeborn.client.eagerlyCreateInputStream.threads | 32 | false | Threads count for streamCreatorPool in CelebornShuffleReader. | 0.3.1 | |
Expand All @@ -39,6 +40,7 @@ license: |
| celeborn.client.flink.inputGate.supportFloatingBuffer | true | false | Whether to support floating buffer in Flink input gates. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-input-gate |
| celeborn.client.flink.resultPartition.memory | 64m | false | Memory reserved for a result partition. | 0.3.0 | remote-shuffle.job.memory-per-partition |
| celeborn.client.flink.resultPartition.supportFloatingBuffer | true | false | Whether to support floating buffer for result partitions. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-output-gate |
| celeborn.client.inputStream.creation.window | 16 | false | Window size that CelebornShuffleReader pre-creates CelebornInputStreams, for coalesced scenariowhere multiple Partitions are read | 0.6.0 | |
| celeborn.client.mr.pushData.max | 32m | false | Max size for a push data sent from mr client. | 0.4.0 | |
| celeborn.client.push.buffer.initial.size | 8k | false | | 0.3.0 | celeborn.push.buffer.initial.size |
| celeborn.client.push.buffer.max.size | 64k | false | Max size of reducer partition buffer memory for shuffle hash writer. The pushed data will be buffered in memory before sending to Celeborn worker. For performance consideration keep this buffer size higher than 32K. Example: If reducer amount is 2000, buffer size is 64K, then each task will consume up to `64KiB * 2000 = 125MiB` heap memory. | 0.3.0 | celeborn.push.buffer.max.size |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,26 @@ class ReusedExchangeSuite extends AnyFunSuite
ShuffleClient.reset()
}

test("[CELEBORN-980] Asynchronously delete original files to fix ReusedExchange bug") {
testReusedExchange(false)
Array(true, false).foreach { chunkPrefetch =>
test(s"[CELEBORN-980] Asynchronously delete original files" +
s"to fix ReusedExchange bug, chunkPrefetch: $chunkPrefetch") {
testReusedExchange(false, false)
}
}

test("[CELEBORN-1177] OpenStream should register stream via ChunkStreamManager to close stream for ReusedExchange") {
testReusedExchange(true)
Array(true, false).foreach { chunkPrefetch =>
test("[CELEBORN-1177] OpenStream should register stream via ChunkStreamManager" +
s"to close stream for ReusedExchange, chunkPrefetch: $chunkPrefetch") {
testReusedExchange(true, false)
}
}

def testReusedExchange(readLocalShuffle: Boolean): Unit = {
def testReusedExchange(readLocalShuffle: Boolean, prefetch: Boolean): Unit = {
val sparkConf = new SparkConf().setAppName("celeborn-test").setMaster("local[2]")
.set("spark.shuffle.manager", "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
.set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", masterInfo._1.rpcEnv.address.toString)
.set(s"spark.${CelebornConf.READ_LOCAL_SHUFFLE_FILE.key}", readLocalShuffle.toString)
.set(s"spark.${CelebornConf.CLIENT_CHUNK_PREFETCH_ENABLED.key}", prefetch.toString)
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "100")
.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "100")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.celeborn.tests.spark.memory

import java.io.File

import scala.util.Random

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.internal.SQLConf
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite
Expand All @@ -33,6 +35,8 @@ class MemorySkewJoinSuite extends AnyFunSuite
with MemorySparkTestBase
with BeforeAndAfterEach {

var lastResult: Row = _

override def beforeEach(): Unit = {
ShuffleClient.reset()
}
Expand All @@ -47,67 +51,104 @@ class MemorySkewJoinSuite extends AnyFunSuite
.set(s"spark.${CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD.key}", "10MB")
}

val sparkConf = new SparkConf().setAppName("celeborn-demo")
.setMaster("local[2]")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set("spark.sql.adaptive.skewJoin.enabled", "true")
.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "16MB")
.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "12MB")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "-1")
.set(SQLConf.PARQUET_COMPRESSION.key, "gzip")
.set(s"spark.${CelebornConf.SHUFFLE_RANGE_READ_FILTER_ENABLED.key}", "true")

val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate()
if (sparkSession.version.startsWith("3")) {
import sparkSession.implicits._
val df = sparkSession.sparkContext.parallelize(1 to 120000, 8)
.map(_ => {
val random = new Random()
val oriKey = random.nextInt(64)
val key = if (oriKey < 32) 1 else oriKey
val fas = random.nextInt(1200000)
val fa = Range(fas, fas + 100).mkString(",")
val fbs = random.nextInt(1200000)
val fb = Range(fbs, fbs + 100).mkString(",")
val fcs = random.nextInt(1200000)
val fc = Range(fcs, fcs + 100).mkString(",")
val fds = random.nextInt(1200000)
val fd = Range(fds, fds + 100).mkString(",")

(key, fa, fb, fc, fd)
})
.toDF("key", "fa", "fb", "fc", "fd")
df.createOrReplaceTempView("view1")
val df2 = sparkSession.sparkContext.parallelize(1 to 8, 8)
.map(i => {
val random = new Random()
val oriKey = random.nextInt(64)
val key = if (oriKey < 32) 1 else oriKey
val fas = random.nextInt(1200000)
val fa = Range(fas, fas + 100).mkString(",")
val fbs = random.nextInt(1200000)
val fb = Range(fbs, fbs + 100).mkString(",")
val fcs = random.nextInt(1200000)
val fc = Range(fcs, fcs + 100).mkString(",")
val fds = random.nextInt(1200000)
val fd = Range(fds, fds + 100).mkString(",")
(key, fa, fb, fc, fd)
})
.toDF("key", "fa", "fb", "fc", "fd")
df2.createOrReplaceTempView("view2")
new File("./df1").delete()
new File("./df2").delete()
df.write.parquet("./df1")
df2.write.parquet("./df2")
sparkSession.close()
}

CompressionCodec.values.foreach { codec =>
test(s"celeborn spark integration test - skew join - $codec") {
val sparkConf = new SparkConf().setAppName("celeborn-demo")
.setMaster("local[2]")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set("spark.sql.adaptive.skewJoin.enabled", "true")
.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "16MB")
.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "12MB")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "-1")
.set(s"spark.celeborn.storage.availableTypes", "HDD,MEMORY")
.set(SQLConf.PARQUET_COMPRESSION.key, "gzip")
.set(s"spark.${CelebornConf.SHUFFLE_COMPRESSION_CODEC.key}", codec.name)
.set(s"spark.${CelebornConf.SHUFFLE_RANGE_READ_FILTER_ENABLED.key}", "true")

enableCeleborn(sparkConf)

val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
if (sparkSession.version.startsWith("3")) {
import sparkSession.implicits._
val df = sparkSession.sparkContext.parallelize(1 to 120000, 8)
.map(i => {
val random = new Random()
val oriKey = random.nextInt(64)
val key = if (oriKey < 32) 1 else oriKey
val fas = random.nextInt(1200000)
val fa = Range(fas, fas + 100).mkString(",")
val fbs = random.nextInt(1200000)
val fb = Range(fbs, fbs + 100).mkString(",")
val fcs = random.nextInt(1200000)
val fc = Range(fcs, fcs + 100).mkString(",")
val fds = random.nextInt(1200000)
val fd = Range(fds, fds + 100).mkString(",")

(key, fa, fb, fc, fd)
})
.toDF("fa", "f1", "f2", "f3", "f4")
df.createOrReplaceTempView("view1")
val df2 = sparkSession.sparkContext.parallelize(1 to 8, 8)
.map(i => {
val random = new Random()
val oriKey = random.nextInt(64)
val key = if (oriKey < 32) 1 else oriKey
val fas = random.nextInt(1200000)
val fa = Range(fas, fas + 100).mkString(",")
val fbs = random.nextInt(1200000)
val fb = Range(fbs, fbs + 100).mkString(",")
val fcs = random.nextInt(1200000)
val fc = Range(fcs, fcs + 100).mkString(",")
val fds = random.nextInt(1200000)
val fd = Range(fds, fds + 100).mkString(",")
(key, fa, fb, fc, fd)
})
.toDF("fb", "f6", "f7", "f8", "f9")
df2.createOrReplaceTempView("view2")
sparkSession.sql("drop table if exists fres")
sparkSession.sql("create table fres using parquet as select * from view1 a inner join view2 b on a.fa=b.fb where a.fa=1 ")
sparkSession.sql("drop table fres")
sparkSession.stop()
Array(true, false).foreach { enablePrefetch =>
test(s"celeborn spark integration test - skew join - $codec $enablePrefetch") {
val sparkConf = new SparkConf().setAppName("celeborn-demo")
.setMaster("local[2]")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set("spark.sql.adaptive.skewJoin.enabled", "true")
.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "16MB")
.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "12MB")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "-1")
.set(s"spark.celeborn.storage.availableTypes", "HDD,MEMORY")
.set(SQLConf.PARQUET_COMPRESSION.key, "gzip")
.set(s"spark.${CelebornConf.SHUFFLE_COMPRESSION_CODEC.key}", codec.name)
.set(s"spark.${CelebornConf.SHUFFLE_RANGE_READ_FILTER_ENABLED.key}", "true")
.set(s"spark.${CelebornConf.CLIENT_CHUNK_PREFETCH_ENABLED.key}", enablePrefetch.toString)

enableCeleborn(sparkConf)

val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
if (sparkSession.version.startsWith("3")) {
sparkSession.read.parquet("./df1").repartition(8).createOrReplaceTempView("df1")
sparkSession.read.parquet("./df2").repartition(8).createOrReplaceTempView("df2")
val result = sparkSession.sql("select count(*), max(df1.fa), max(df1.fb), max(df1.fc)," +
"max(df1.fd), max(df2.fa), max(df2.fb), max(df2.fc), max(df2.fd)" +
"from df1 join df2 on df1.key=df2.key and df1.key=1").collect()
if (lastResult == null) {
lastResult = result(0)
} else {
assert((lastResult.getLong(0) == result(0).getLong(0)) &&
(lastResult.getString(1) == result(0).getString(1)) &&
(lastResult.getString(2) == result(0).getString(2)) &&
(lastResult.getString(3) == result(0).getString(3)) &&
(lastResult.getString(4) == result(0).getString(4)) &&
(lastResult.getString(5) == result(0).getString(5)) &&
(lastResult.getString(6) == result(0).getString(6)) &&
(lastResult.getString(7) == result(0).getString(7)) &&
(lastResult.getString(8) == result(0).getString(8)))
}
sparkSession.stop()
}
}
}
}

}

0 comments on commit d692e49

Please sign in to comment.