Skip to content

Commit

Permalink
[CELEBORN-1535] Support to disable master workerUnavailableInfo expir…
Browse files Browse the repository at this point in the history
…ation

### What changes were proposed in this pull request?

In this pr, it supports to disable the worker unavailable expiration by setting the timeout to -1.

### Why are the changes needed?

In our use case, we want to reserve all the worker unavailable information.
It is acceptable if we use the fixed ports and hosts, and will not occupy much memory resource.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

Not needed.

Closes apache#2657 from turboFei/disable_Cleanup.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
  • Loading branch information
turboFei authored and RexXiong committed Aug 7, 2024
1 parent ca15eea commit a599ff2
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2184,7 +2184,8 @@ object CelebornConf extends Logging {
buildConf("celeborn.master.workerUnavailableInfo.expireTimeout")
.categories("master")
.version("0.3.1")
.doc("Worker unavailable info would be cleared when the retention period is expired")
.doc("Worker unavailable info would be cleared when the retention period is expired." +
" Set -1 to disable the expiration.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1800s")

Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ license: |
| celeborn.master.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. | 0.3.1 | |
| celeborn.master.slot.assign.policy | ROUNDROBIN | false | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.activeTypes` | 0.3.0 | celeborn.slots.assign.policy |
| celeborn.master.userResourceConsumption.update.interval | 30s | false | Time length for a window about compute user resource consumption. | 0.3.0 | |
| celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | false | Worker unavailable info would be cleared when the retention period is expired | 0.3.1 | |
| celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | false | Worker unavailable info would be cleared when the retention period is expired. Set -1 to disable the expiration. | 0.3.1 | |
| celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service of Spark when Master side checks that there is no enough quota for current user. | 0.2.0 | |
| celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false | Regex to decide which Celeborn configuration properties and environment variables in master and worker environments contain sensitive information. When this regex matches a property key or value, the value is redacted from the logging. | 0.5.0 | |
| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | celeborn.storage.activeTypes |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,17 @@ private[celeborn] class Master(
appHeartbeatTimeoutMs / 2,
TimeUnit.MILLISECONDS)

checkForUnavailableWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerUnavailableInfoTimeout)
}
},
0,
workerUnavailableInfoExpireTimeoutMs / 2,
TimeUnit.MILLISECONDS)
if (workerUnavailableInfoExpireTimeoutMs > 0) {
checkForUnavailableWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerUnavailableInfoTimeout)
}
},
0,
workerUnavailableInfoExpireTimeoutMs / 2,
TimeUnit.MILLISECONDS)
}

if (hasHDFSStorage || hasS3Storage) {
checkForS3RemnantDirsTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
Expand Down

0 comments on commit a599ff2

Please sign in to comment.