Skip to content

Commit

Permalink
[CELEBORN-1369] Support for disable fallback to Spark's default shuffle
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
An option to disable fallback is provided.

### Why are the changes needed?
It's dangerous to fallback to external shuffle when applications run on both online and offline nodes because online services could be impacted due to a shortage of disk capacity.

### Does this PR introduce _any_ user-facing change?
Yes, fallback to Spark's default shuffle can be disabled by setting `celeborn.client.spark.shuffle.fallback.enabled=false`

### How was this patch tested?
manual test

Closes apache#2444 from littlexyw/fallback_disable.

Lead-authored-by: xinyuwang1 <xinyuwang1@xiaohongshu.com>
Co-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
  • Loading branch information
xinyuwang1 and waitinfuture committed May 3, 2024
1 parent 9f30479 commit 7b1645f
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,33 @@ package org.apache.spark.shuffle.celeborn

import org.apache.celeborn.client.LifecycleManager
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.exception.CelebornIOException
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.FallbackPolicy

class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
private val shuffleFallbackPolicy = conf.shuffleFallbackPolicy

def applyAllFallbackPolicy(lifecycleManager: LifecycleManager, numPartitions: Int): Boolean = {
applyForceFallbackPolicy() || applyShufflePartitionsFallbackPolicy(numPartitions) ||
!checkQuota(lifecycleManager) || !checkWorkersAvailable(lifecycleManager)
val needFallback =
applyForceFallbackPolicy() || applyShufflePartitionsFallbackPolicy(numPartitions) ||
!checkQuota(lifecycleManager) || !checkWorkersAvailable(lifecycleManager)
if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
throw new CelebornIOException("Fallback to Spark's default shuffle is prohibited.")
}
needFallback
}

/**
* if celeborn.shuffle.forceFallback.enabled is true, fallback to external shuffle
* @return return celeborn.shuffle.forceFallback.enabled
* if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to external shuffle
* @return return if celeborn.client.spark.shuffle.fallback.policy is ALWAYS
*/
def applyForceFallbackPolicy(): Boolean = {
if (conf.shuffleForceFallbackEnabled) {
val conf = CelebornConf.SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED
logWarning(s"${conf.alternatives.foldLeft(conf.key)((x, y) => s"$x or $y")} is enabled, which will force fallback.")
if (FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)) {
logWarning(
s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is ${FallbackPolicy.ALWAYS.name}, which will force fallback.")
}
conf.shuffleForceFallbackEnabled
FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,33 @@ package org.apache.spark.shuffle.celeborn

import org.apache.celeborn.client.LifecycleManager
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.exception.CelebornIOException
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.FallbackPolicy

class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
private val shuffleFallbackPolicy = conf.shuffleFallbackPolicy

def applyAllFallbackPolicy(lifecycleManager: LifecycleManager, numPartitions: Int): Boolean = {
applyForceFallbackPolicy() || applyShufflePartitionsFallbackPolicy(numPartitions) ||
!checkQuota(lifecycleManager) || !checkWorkersAvailable(lifecycleManager)
val needFallback =
applyForceFallbackPolicy() || applyShufflePartitionsFallbackPolicy(numPartitions) ||
!checkQuota(lifecycleManager) || !checkWorkersAvailable(lifecycleManager)
if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
throw new CelebornIOException("Fallback to Spark's default shuffle is prohibited.")
}
needFallback
}

/**
* if celeborn.shuffle.forceFallback.enabled is true, fallback to external shuffle
* @return return celeborn.shuffle.forceFallback.enabled
* if celeborn.client.spark.shuffle.fallback.policy is ALWAYS, fallback to external shuffle
* @return return if celeborn.client.spark.shuffle.fallback.policy is ALWAYS
*/
def applyForceFallbackPolicy(): Boolean = {
if (conf.shuffleForceFallbackEnabled) {
val conf = CelebornConf.SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED
logWarning(s"${conf.alternatives.foldLeft(conf.key)((x, y) => s"$x or $y")} is enabled, which will force fallback.")
if (FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)) {
logWarning(
s"${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is ${FallbackPolicy.ALWAYS.name}, which will force fallback.")
}
conf.shuffleForceFallbackEnabled
FallbackPolicy.ALWAYS.equals(shuffleFallbackPolicy)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.common.protocol;

public enum FallbackPolicy {
ALWAYS,
NEVER,
AUTO
}
55 changes: 44 additions & 11 deletions common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,15 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def shuffleRangeReadFilterEnabled: Boolean = get(SHUFFLE_RANGE_READ_FILTER_ENABLED)
def shuffleForceFallbackEnabled: Boolean = get(SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED)
def checkWorkerEnabled: Boolean = get(CHECK_WORKER_ENABLED)
def shuffleFallbackPolicy: FallbackPolicy = {
val fallbackPolicyGiven = FallbackPolicy.valueOf(get(SPARK_SHUFFLE_FALLBACK_POLICY))
if (shuffleForceFallbackEnabled && FallbackPolicy.AUTO.equals(fallbackPolicyGiven)) {
FallbackPolicy.ALWAYS
} else {
fallbackPolicyGiven
}
}

def shuffleForceFallbackPartitionThreshold: Long =
get(SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD)
def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL)
Expand Down Expand Up @@ -1466,7 +1475,15 @@ object CelebornConf extends Logging {
DeprecatedConfig(
"celeborn.worker.storage.baseDir.number",
"0.4.0",
"Please use celeborn.worker.storage.dirs"))
"Please use celeborn.worker.storage.dirs"),
DeprecatedConfig(
"celeborn.client.spark.shuffle.forceFallback.enabled",
"0.5.0",
"Please use celeborn.client.spark.shuffle.fallback.policy"),
DeprecatedConfig(
"celeborn.shuffle.forceFallback.enabled",
"0.5.0",
"Please use celeborn.client.spark.shuffle.fallback.policy"))

Map(configs.map { cfg => (cfg.key -> cfg) }: _*)
}
Expand Down Expand Up @@ -4339,15 +4356,6 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)

val SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.spark.shuffle.forceFallback.enabled")
.withAlternative("celeborn.shuffle.forceFallback.enabled")
.categories("client")
.version("0.3.0")
.doc("Whether force fallback shuffle to Spark's default.")
.booleanConf
.createWithDefault(false)

val CHECK_WORKER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.spark.shuffle.checkWorker.enabled")
.categories("client")
Expand All @@ -4358,13 +4366,38 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)

val SPARK_SHUFFLE_FALLBACK_POLICY: ConfigEntry[String] =
buildConf("celeborn.client.spark.shuffle.fallback.policy")
.categories("client")
.version("0.5.0")
.doc(
s"Celeborn supports the following kind of fallback policies. 1. ${FallbackPolicy.ALWAYS.name}: force fallback shuffle to Spark's default; " +
s"2. ${FallbackPolicy.AUTO.name}: consider other factors like availability of enough workers and quota, or whether shuffle of partition number is lower than celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold; " +
s"3. ${FallbackPolicy.NEVER.name}: the job will fail if it is concluded that fallback is required based on factors above.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set(
FallbackPolicy.ALWAYS.name,
FallbackPolicy.AUTO.name,
FallbackPolicy.NEVER.name))
.createWithDefault(FallbackPolicy.AUTO.name)

val SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.spark.shuffle.forceFallback.enabled")
.withAlternative("celeborn.shuffle.forceFallback.enabled")
.categories("client")
.version("0.3.0")
.doc(s"Whether force fallback shuffle to Spark's default. This configuration only takes effect when ${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is ${FallbackPolicy.AUTO.name}.")
.booleanConf
.createWithDefault(false)

val SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold")
.withAlternative("celeborn.shuffle.forceFallback.numPartitionsThreshold")
.categories("client")
.version("0.3.0")
.doc(
"Celeborn will only accept shuffle of partition number lower than this configuration value.")
s"Celeborn will only accept shuffle of partition number lower than this configuration value. This configuration only takes effect when ${CelebornConf.SPARK_SHUFFLE_FALLBACK_POLICY.key} is ${FallbackPolicy.AUTO.name}.")
.longConf
.createWithDefault(Int.MaxValue)

Expand Down
5 changes: 3 additions & 2 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ license: |
| celeborn.client.spark.push.sort.memory.useAdaptiveThreshold | false | false | Adaptively adjust sort-based shuffle writer's memory threshold | 0.5.0 | |
| celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | false | This is Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you have changed UnsafeRow's memory layout set this to false. | 0.2.2 | |
| celeborn.client.spark.shuffle.checkWorker.enabled | true | false | When true, before registering shuffle, LifecycleManager should check if current cluster have available workers, if cluster don't have available workers, fallback to Spark's default shuffle | 0.5.0 | |
| celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Whether force fallback shuffle to Spark's default. | 0.3.0 | celeborn.shuffle.forceFallback.enabled |
| celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold | 2147483647 | false | Celeborn will only accept shuffle of partition number lower than this configuration value. | 0.3.0 | celeborn.shuffle.forceFallback.numPartitionsThreshold |
| celeborn.client.spark.shuffle.fallback.policy | AUTO | false | Celeborn supports the following kind of fallback policies. 1. ALWAYS: force fallback shuffle to Spark's default; 2. AUTO: consider other factors like availability of enough workers and quota, or whether shuffle of partition number is lower than celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold; 3. NEVER: the job will fail if it is concluded that fallback is required based on factors above. | 0.5.0 | |
| celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Whether force fallback shuffle to Spark's default. This configuration only takes effect when celeborn.client.spark.shuffle.fallback.policy is AUTO. | 0.3.0 | celeborn.shuffle.forceFallback.enabled |
| celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold | 2147483647 | false | Celeborn will only accept shuffle of partition number lower than this configuration value. This configuration only takes effect when celeborn.client.spark.shuffle.fallback.policy is AUTO. | 0.3.0 | celeborn.shuffle.forceFallback.numPartitionsThreshold |
| celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | celeborn.shuffle.writer |
| celeborn.master.endpoints | &lt;localhost&gt;:9097 | false | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 | |
| celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service of Spark when Master side checks that there is no enough quota for current user. | 0.2.0 | |
Expand Down
2 changes: 2 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ license: |

- Since 0.5.0, Celeborn client removes configuration `celeborn.client.push.splitPartition.threads`, `celeborn.client.flink.inputGate.minMemory` and `celeborn.client.flink.resultPartition.minMemory`.

- Since 0.5.0, Celeborn deprecate `celeborn.client.spark.shuffle.forceFallback.enabled`. Please use `celeborn.client.spark.shuffle.fallback.policy` instead.

## Upgrading from 0.4.0 to 0.4.1

- Since 0.4.1, Celeborn master adds a limit to the estimated partition size used for computing worker slots.
Expand Down

0 comments on commit 7b1645f

Please sign in to comment.