Skip to content

Commit

Permalink
[CELEBORN-1034] Offer slots uses random range of available workers in…
Browse files Browse the repository at this point in the history
…stead of shuffling

### What changes were proposed in this pull request?
In original design, (primary worker, replica worker) pairs tends to stay stable, for example,
for primary PartitionLocations on Worker A, their replica PartitionLocations will all be on
Worker B in general scenarios, i.e. all workers are healthy and works well. This way, one Worker
will have only one (or very few) connections to other workers' replicate netty server.

However, apache#1790 calls `Collections.shuffle(availableWorkers)`,
causing the number of replica connections increases dramatically:
![image](https://github.com/apache/incubator-celeborn/assets/948245/013c7bc8-a224-413e-9c0c-519ae76c9d32)

### Why are the changes needed?
This PR refine the logic of selecting limited number of workers, instead of shuffling,
Master just randomly picks a range of available workers.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manual test.

Closes apache#1975 from waitinfuture/1034.

Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
  • Loading branch information
2 people authored and FMX committed Oct 18, 2023
1 parent 8bf7e52 commit a5dfd67
Showing 1 changed file with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -615,22 +615,31 @@ private[celeborn] class Master(
val numReducers = requestSlots.partitionIdList.size()
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, requestSlots.shuffleId)

var availableWorkers = workersAvailable()
Collections.shuffle(availableWorkers)
val availableWorkers = workersAvailable()
val numAvailableWorkers = availableWorkers.size()
val numWorkers = Math.min(
Math.max(
if (requestSlots.shouldReplicate) 2 else 1,
if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
availableWorkers.size())
availableWorkers = availableWorkers.subList(0, numWorkers)
numAvailableWorkers)
val startIndex = Random.nextInt(numAvailableWorkers)
val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
selectedWorkers.addAll(availableWorkers.subList(
startIndex,
Math.min(numAvailableWorkers, startIndex + numWorkers)))
if (startIndex + numWorkers > numAvailableWorkers) {
selectedWorkers.addAll(availableWorkers.subList(
0,
startIndex + numWorkers - numAvailableWorkers))
}
// offer slots
val slots =
masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") {
statusSystem.workers.synchronized {
if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && !hasHDFSStorage) {
SlotsAllocator.offerSlotsLoadAware(
availableWorkers,
selectedWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware,
Expand All @@ -641,7 +650,7 @@ private[celeborn] class Master(
loadAwareFetchTimeWeight)
} else {
SlotsAllocator.offerSlotsRoundRobin(
availableWorkers,
selectedWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware)
Expand Down

0 comments on commit a5dfd67

Please sign in to comment.