Skip to content

Commit

Permalink
[CELEBORN-1103][BUG] only clean up expire data for good disks
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
**When a bad disk occurs, cleaning up expired shuffle keys can cause NullPointerException appearing in the thread pool obtained from `diskOperators` in `StorageManager`.
Therefore, only cleaning up expired shuffle keys from good disks will not cause the above problems.**

https://issues.apache.org/jira/browse/CELEBORN-1103

### Why are the changes needed?
bugfix

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes apache#2060 from suizhe007/CELEBORN-1103.

Lead-authored-by: qinrui <qr7972@gmail.com>
Co-authored-by: qinrui <51885730+suizhe007@users.noreply.github.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
  • Loading branch information
2 people authored and waitinfuture committed Oct 31, 2023
1 parent 11fe324 commit 232a44b
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
}
}
val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
disksSnapshot().filter(_.status != DiskStatus.IO_HANG).foreach { diskInfo =>
disksSnapshot().filter(diskInfo =>
diskInfo.status == DiskStatus.HEALTHY
|| diskInfo.status == DiskStatus.HIGH_DISK_USAGE).foreach { diskInfo =>
diskInfo.dirs.foreach { dir =>
val file = new File(dir, s"$appId/$shuffleId")
deleteDirectory(file, diskOperators.get(diskInfo.mountPoint))
Expand Down Expand Up @@ -606,7 +608,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs

private def cleanupExpiredAppDirs(expireDuration: Long): Unit = {
val diskInfoAndAppDirs = disksSnapshot()
.filter(_.status != DiskStatus.IO_HANG)
.filter(diskInfo =>
diskInfo.status == DiskStatus.HEALTHY
|| diskInfo.status == DiskStatus.HIGH_DISK_USAGE)
.map { case diskInfo =>
(diskInfo, diskInfo.dirs.filter(_.exists).flatMap(_.listFiles()))
}
Expand Down

0 comments on commit 232a44b

Please sign in to comment.