Skip to content

Commit

Permalink
[CELEBORN-1529] Read shuffle data from S3
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
as title

### Why are the changes needed?

The change aims to make Celeborn read shuffle data from S3

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?
Yes

Closes apache#2651 from zhaohehuhu/dev-0726.

Authored-by: zhaohehuhu <luoyedeyi@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
  • Loading branch information
zhaohehuhu authored and FMX committed Aug 12, 2024
1 parent 3234bef commit 59b88be
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 19 deletions.
4 changes: 2 additions & 2 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@
------------------------------------------------------------------------------------
This project bundles the following dependencies under the Apache License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt):

com.amazonaws:aws-java-sdk-bundle
com.fasterxml.jackson.core:jackson-annotations
com.fasterxml.jackson.core:jackson-core
com.fasterxml.jackson.core:jackson-databind
Expand Down Expand Up @@ -261,9 +262,9 @@ io.swagger.core.v3:swagger-models
me.bechberger:ap-loader-all
org.apache.commons:commons-crypto
org.apache.commons:commons-lang3
org.apache.hadoop:hadoop-aws
org.apache.hadoop:hadoop-client-api
org.apache.hadoop:hadoop-client-runtime
org.apache.hadoop:hadoop-aws
org.apache.ibatis:mybatis
org.apache.logging.log4j:log4j-1.2-api
org.apache.logging.log4j:log4j-api
Expand Down Expand Up @@ -308,7 +309,6 @@ org.slf4j:jcl-over-slf4j
org.webjars:swagger-ui
org.xerial.snappy:snappy-java
org.yaml:snakeyaml
com.amazonaws:aws-java-sdk-bundle

------------------------------------------------------------------------------------
This product bundles various third-party components under other open source licenses.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ private PartitionReader createReader(
fetchChunkMaxRetry,
callback);
}
case S3:
case HDFS:
return new DfsPartitionReader(
conf, shuffleKey, location, clientFactory, startMapIndex, endMapIndex, callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import scala.concurrent.duration._
import scala.util.Try
import scala.util.matching.Regex

import org.apache.celeborn.common.CelebornConf.{MASTER_INTERNAL_ENDPOINTS, S3_ACCESS_KEY, S3_DIR, S3_SECRET_KEY}
import org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
import org.apache.celeborn.common.identity.{DefaultIdentityProvider, IdentityProvider}
import org.apache.celeborn.common.internal.Logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,18 @@ class UtilsSuite extends CelebornFunSuite {
assert(false == Utils.isHdfsPath(localPath))
}

test("validate s3 compatible fs path") {
val hdfsPath = "hdfs://xxx:9000/xxxx/xx-xx/x-x-x"
val simpleS3Path = "s3a://xxxx/xx-xx/x-x-x"
val sortedS3Path = "s3a://xxx/xxxx/xx-xx/x-x-x.sorted"
val indexS3Path = "s3a://xxx/xxxx/xx-xx/x-x-x.index"
assert(false == Utils.isS3Path(hdfsPath))
assert(false == Utils.isHdfsPath(simpleS3Path))
assert(true == Utils.isS3Path(simpleS3Path))
assert(true == Utils.isS3Path(sortedS3Path))
assert(true == Utils.isS3Path(indexS3Path))
}

test("GetReducerFileGroupResponse class convert with pb") {
val fileGroup = new util.HashMap[Integer, util.Set[PartitionLocation]]
fileGroup.put(0, partitionLocation(0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,21 +286,27 @@ private void check(
}
}

private void checkSlotsOnHDFS(
private void checkSlotsOnDFS(
List<WorkerInfo> workers,
List<Integer> partitionIds,
boolean shouldReplicate,
boolean expectSuccess,
boolean roundrobin) {
String shuffleKey = "appId-1";
boolean roundRobin,
boolean enableS3) {
CelebornConf conf = new CelebornConf();
conf.set("celeborn.active.storage.levels", "HDFS");
if (enableS3) {
conf.set("celeborn.active.storage.levels", "S3");
} else {
conf.set("celeborn.active.storage.levels", "HDFS");
}
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> slots;
if (roundrobin) {
if (roundRobin) {
int availableStorageTypes = enableS3 ? StorageInfo.S3_MASK : StorageInfo.HDFS_MASK;
slots =
SlotsAllocator.offerSlotsRoundRobin(
workers, partitionIds, shouldReplicate, false, StorageInfo.HDFS_MASK);
workers, partitionIds, shouldReplicate, false, availableStorageTypes);
} else {
int availableStorageTypes = enableS3 ? StorageInfo.S3_MASK : StorageInfo.HDFS_MASK;
slots =
SlotsAllocator.offerSlotsLoadAware(
workers,
Expand All @@ -313,14 +319,11 @@ private void checkSlotsOnHDFS(
0.1,
0,
1,
StorageInfo.LOCAL_DISK_MASK | StorageInfo.HDFS_MASK);
StorageInfo.LOCAL_DISK_MASK | availableStorageTypes);
}
int allocatedPartitionCount = 0;
Map<WorkerInfo, Map<String, Integer>> slotsDistribution =
SlotsAllocator.slotsToDiskAllocations(slots);
for (Map.Entry<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
workerToPartitions : slots.entrySet()) {
WorkerInfo workerInfo = workerToPartitions.getKey();
List<PartitionLocation> primaryLocs = workerToPartitions.getValue()._1;
List<PartitionLocation> replicaLocs = workerToPartitions.getValue()._2();
allocatedPartitionCount += primaryLocs.size();
Expand All @@ -339,7 +342,7 @@ public void testHDFSOnly() {
partitionIds.add(i);
}
final boolean shouldReplicate = true;
checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, true);
checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, true, false);
}

@Test
Expand All @@ -365,7 +368,7 @@ public void testLocalDisksAndHDFSOnRoundRobin() {
partitionIds.add(i);
}
final boolean shouldReplicate = true;
checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, true);
checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, true, false);
}

@Test
Expand All @@ -390,7 +393,7 @@ public void testLocalDisksAndHDFSOnLoadAware() {
partitionIds.add(i);
}
final boolean shouldReplicate = true;
checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, false);
checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, false, false);
}

@Test
Expand Down Expand Up @@ -418,7 +421,7 @@ public void testLocalDisksAndHDFSOnLoadAwareWithInsufficientSlots() {
partitionIds.add(i);
}
final boolean shouldReplicate = true;
checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, false);
checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, false, false);
}

@Test
Expand All @@ -432,4 +435,35 @@ public void testAllocateSlotsWithNoAvailableSlots() {

check(workers, partitionIds, shouldReplicate, true);
}

@Test
public void testS3Only() {
final List<WorkerInfo> workers = prepareWorkers(false);
final List<Integer> partitionIds = new ArrayList<>();
for (int i = 0; i < 3000; i++) {
partitionIds.add(i);
}
final boolean shouldReplicate = true;
checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, true, true);
}

@Test
public void testLocalDisksAndS3() {
final List<WorkerInfo> workers = prepareWorkers(true);
DiskInfo s3DiskInfo1 =
new DiskInfo("S3", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, StorageInfo.Type.S3);
DiskInfo s3DiskInfo2 =
new DiskInfo("S3", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, StorageInfo.Type.S3);
s3DiskInfo1.maxSlots_$eq(Long.MAX_VALUE);
s3DiskInfo2.maxSlots_$eq(Long.MAX_VALUE);
workers.get(0).diskInfos().put("S3", s3DiskInfo1);
workers.get(1).diskInfos().put("S3", s3DiskInfo2);
final List<Integer> partitionIds = new ArrayList<>();
for (int i = 0; i < 3000; i++) {
partitionIds.add(i);
}
final boolean shouldReplicate = true;
checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, true, true);
checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, false, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ class StoragePolicySuite extends CelebornFunSuite {

test("test create file order case1") {
val conf = new CelebornConf()
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,SSD,HDD,HDFS,OSS")
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,SSD,HDD,HDFS,OSS,S3")
val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource)
val file = storagePolicy.createFile(mockedPartitionWriterContext)
assert(file.isInstanceOf[CelebornMemoryFile])
}

test("test create file order case2") {
val conf = new CelebornConf()
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "SSD,HDD,HDFS,OSS")
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "SSD,HDD,HDFS,OSS,S3")
val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource)
val file = storagePolicy.createFile(mockedPartitionWriterContext)
assert(file.isInstanceOf[CelebornDiskFile])
Expand Down

0 comments on commit 59b88be

Please sign in to comment.