Skip to content

Commit

Permalink
[CELEBORN-1531] Refactor self checks in master
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
as title

### Why are the changes needed?

add a scheduleCheckTask method  to refactor some code

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Closes apache#2653 from zhaohehuhu/dev-0731.

Authored-by: zhaohehuhu <luoyedeyi@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
  • Loading branch information
zhaohehuhu authored and FMX committed Aug 13, 2024
1 parent 6b24f5d commit 960ba24
Showing 1 changed file with 19 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,52 +294,36 @@ private[celeborn] class Master(
sendApplicationMetaThreads,
"send-application-meta")
}
checkForWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(ControlMessages.pbCheckForWorkerTimeout)
}
},
0,
workerHeartbeatTimeoutMs,
TimeUnit.MILLISECONDS)

checkForApplicationTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForApplicationTimeOut)
}
},
0,
appHeartbeatTimeoutMs / 2,
TimeUnit.MILLISECONDS)
checkForWorkerTimeOutTask = scheduleCheckTask(workerHeartbeatTimeoutMs, pbCheckForWorkerTimeout)
checkForApplicationTimeOutTask =
scheduleCheckTask(appHeartbeatTimeoutMs / 2, CheckForApplicationTimeOut)

if (workerUnavailableInfoExpireTimeoutMs > 0) {
checkForUnavailableWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerUnavailableInfoTimeout)
}
},
0,
scheduleCheckTask(
workerUnavailableInfoExpireTimeoutMs / 2,
TimeUnit.MILLISECONDS)
CheckForWorkerUnavailableInfoTimeout)
}

if (hasHDFSStorage || hasS3Storage) {
checkForS3RemnantDirsTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForDFSExpiredDirsTimeout)
}
},
dfsExpireDirsTimeoutMS,
dfsExpireDirsTimeoutMS,
TimeUnit.MILLISECONDS)
checkForHDFSRemnantDirsTimeOutTask =
scheduleCheckTask(dfsExpireDirsTimeoutMS, CheckForDFSExpiredDirsTimeout)
}

}

def scheduleCheckTask[T](timeoutMS: Long, message: T): ScheduledFuture[_] = {
forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(message)
}
},
timeoutMS,
timeoutMS,
TimeUnit.MILLISECONDS)
}

override def onStop(): Unit = {
if (!threadsStarted.compareAndSet(true, false)) {
return
Expand Down

0 comments on commit 960ba24

Please sign in to comment.