Skip to content

Commit

Permalink
[CELEBORN-1557] Fix totalSpace of DiskInfo for Master in HA mode
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Fix `totalSpace` of `DiskInfo` for Master in HA mode.

### Why are the changes needed?

The `totalSpace` of `DiskInfo` does not sync for Master in HA mode, which causes that the `totalSpace` is incorrect.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`RatisMasterStatusSystemSuiteJ#testHandleRegisterWorker`

Closes apache#2690 from SteNicholas/CELEBORN-1557.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
  • Loading branch information
SteNicholas authored and FMX committed Aug 19, 2024
1 parent ae41cb5 commit b330b55
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ public static Map<String, DiskInfo> fromPbDiskInfos(
(k, v) -> {
DiskInfo diskInfo =
new DiskInfo(
v.getMountPoint(),
v.getUsableSpace(),
v.getAvgFlushTime(),
v.getAvgFetchTime(),
v.getUsedSlots(),
StorageInfo.typesMap.get(v.getStorageType()));
diskInfo.setStatus(Utils.toDiskStatus(v.getStatus()));
v.getMountPoint(),
v.getUsableSpace(),
v.getAvgFlushTime(),
v.getAvgFetchTime(),
v.getUsedSlots(),
StorageInfo.typesMap.get(v.getStorageType()))
.setStatus(Utils.toDiskStatus(v.getStatus()))
.setTotalSpace(v.getTotalSpace());
map.put(k, diskInfo);
});
return map;
Expand All @@ -90,6 +91,7 @@ public static Map<String, ResourceProtos.DiskInfo> toPbDiskInfos(
.setUsedSlots(v.activeSlots())
.setStorageType(v.storageType().getValue())
.setStatus(v.status().getValue())
.setTotalSpace(v.totalSpace())
.build()));
return map;
}
Expand Down
1 change: 1 addition & 0 deletions master/src/main/proto/Resource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ message DiskInfo {
required int32 status = 5;
required int64 avgFetchTime = 6;
required int32 storageType =7;
optional int64 totalSpace = 8;
}

message RequestSlotsRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,27 @@ public void testHandleRegisterWorker() throws InterruptedException {
getNewReqeustId());
Thread.sleep(3000L);

Assert.assertEquals(STATUSSYSTEM1.workers.size(), 3);
Assert.assertEquals(3, STATUSSYSTEM1.workers.size());
Assert.assertEquals(3, STATUSSYSTEM2.workers.size());
Assert.assertEquals(3, STATUSSYSTEM3.workers.size());

assertWorkers(STATUSSYSTEM1.workers);
assertWorkers(STATUSSYSTEM2.workers);
assertWorkers(STATUSSYSTEM3.workers);
}

private void assertWorkers(Set<WorkerInfo> workerInfos) {
for (WorkerInfo workerInfo : workerInfos) {
assertWorker(workerInfo);
}
}

private void assertWorker(WorkerInfo workerInfo) {
Map<String, DiskInfo> diskInfos = workerInfo.diskInfos();
Assert.assertEquals(96 * 1024 * 1024 * 1024L, diskInfos.get("disk1").totalSpace());
Assert.assertEquals(96 * 1024 * 1024 * 1024L, diskInfos.get("disk2").totalSpace());
Assert.assertEquals(96 * 1024 * 1024 * 1024L, diskInfos.get("disk3").totalSpace());
Assert.assertEquals(96 * 1024 * 1024 * 1024L, diskInfos.get("disk4").totalSpace());
}

@Test
Expand Down Expand Up @@ -994,22 +1011,58 @@ public void resetStatus() {
STATUSSYSTEM3.workerLostEvents.clear();

disks1.clear();
disks1.put("disk1", new DiskInfo("disk1", 64 * 1024 * 1024 * 1024L, 100, 100, 0));
disks1.put("disk2", new DiskInfo("disk2", 64 * 1024 * 1024 * 1024L, 100, 100, 0));
disks1.put("disk3", new DiskInfo("disk3", 64 * 1024 * 1024 * 1024L, 100, 100, 0));
disks1.put("disk4", new DiskInfo("disk4", 64 * 1024 * 1024 * 1024L, 100, 100, 0));
disks1.put(
"disk1",
new DiskInfo("disk1", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
.setTotalSpace(96 * 1024 * 1024 * 1024L));
disks1.put(
"disk2",
new DiskInfo("disk2", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
.setTotalSpace(96 * 1024 * 1024 * 1024L));
disks1.put(
"disk3",
new DiskInfo("disk3", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
.setTotalSpace(96 * 1024 * 1024 * 1024L));
disks1.put(
"disk4",
new DiskInfo("disk4", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
.setTotalSpace(96 * 1024 * 1024 * 1024L));

disks2.clear();
disks2.put("disk1", new DiskInfo("disk1", 64 * 1024 * 1024 * 1024L, 100, 100, 0));
disks2.put("disk2", new DiskInfo("disk2", 64 * 1024 * 1024 * 1024L, 100, 100, 0));
disks2.put("disk3", new DiskInfo("disk3", 64 * 1024 * 1024 * 1024L, 100, 100, 0));
disks2.put("disk4", new DiskInfo("disk4", 64 * 1024 * 1024 * 1024L, 100, 100, 0));
disks2.put(
"disk1",
new DiskInfo("disk1", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
.setTotalSpace(96 * 1024 * 1024 * 1024L));
disks2.put(
"disk2",
new DiskInfo("disk2", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
.setTotalSpace(96 * 1024 * 1024 * 1024L));
disks2.put(
"disk3",
new DiskInfo("disk3", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
.setTotalSpace(96 * 1024 * 1024 * 1024L));
disks2.put(
"disk4",
new DiskInfo("disk4", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
.setTotalSpace(96 * 1024 * 1024 * 1024L));

disks3.clear();
disks3.put("disk1", new DiskInfo("disk1", 64 * 1024 * 1024 * 1024L, 100, 100, 0));
disks3.put("disk2", new DiskInfo("disk2", 64 * 1024 * 1024 * 1024L, 100, 100, 0));
disks3.put("disk3", new DiskInfo("disk3", 64 * 1024 * 1024 * 1024L, 100, 100, 0));
disks3.put("disk4", new DiskInfo("disk4", 64 * 1024 * 1024 * 1024L, 100, 100, 0));
disks3.put(
"disk1",
new DiskInfo("disk1", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
.setTotalSpace(96 * 1024 * 1024 * 1024L));
disks3.put(
"disk2",
new DiskInfo("disk2", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
.setTotalSpace(96 * 1024 * 1024 * 1024L));
disks3.put(
"disk3",
new DiskInfo("disk3", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
.setTotalSpace(96 * 1024 * 1024 * 1024L));
disks3.put(
"disk4",
new DiskInfo("disk4", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
.setTotalSpace(96 * 1024 * 1024 * 1024L));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,13 +851,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
0
}
}.sum
val fileSystemReportedUsableSpace = Files.getFileStore(
Paths.get(diskInfo.mountPoint)).getUsableSpace
val fileStore = Files.getFileStore(Paths.get(diskInfo.mountPoint))
val fileSystemReportedUsableSpace = fileStore.getUsableSpace
val workingDirUsableSpace =
Math.min(diskInfo.configuredUsableSpace - totalUsage, fileSystemReportedUsableSpace)
logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace filemeta:$fileSystemReportedUsableSpace conf:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage")
val totalSpace = fileStore.getTotalSpace
logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace filemeta:$fileSystemReportedUsableSpace conf:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage totalSpace: $totalSpace")
diskInfo.setUsableSpace(workingDirUsableSpace)
diskInfo.setTotalSpace(totalUsage + workingDirUsableSpace)
diskInfo.setTotalSpace(totalSpace)
diskInfo.updateFlushTime()
diskInfo.updateFetchTime()
}
Expand Down

0 comments on commit b330b55

Please sign in to comment.