Skip to content

Commit

Permalink
[CELEBORN-1460] MRAppMasterWithCeleborn supports setting mapreduce.ce…
Browse files Browse the repository at this point in the history
…leborn.master.endpoints via environment variable CELEBORN_MASTER_ENDPOINTS

### What changes were proposed in this pull request?

`MRAppMasterWithCeleborn` sets `mapreduce.celeborn.master.endpoints` via environment variable `CELEBORN_MASTER_ENDPOINTS`.

### Why are the changes needed?

`MRAppMasterWithCeleborn` sets `mapreduce.celeborn.master.endpoints` via `${HADOOP_CONF_DIR}/mapred-site.xml` or `-Dmapreduce.celeborn.master.endpoints` at present. It could not set `mapreduce.celeborn.master.endpoints` by above way for integration with `RMProxy` which could provide `MRAppMasterWithCeleborn` with master endpoints via `environments` of `TaskAttemptImpl`. It's recommended that `MRAppMasterWithCeleborn` supports setting `mapreduce.celeborn.master.endpoints` via environment variable `CELEBORN_MASTER_ENDPOINTS`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`WordCountTest`

Closes apache#2558 from SteNicholas/CELEBORN-1460.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
  • Loading branch information
SteNicholas authored and FMX committed Jun 17, 2024
1 parent cff2a7d commit c394fd4
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 7 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ Meanwhile, configure the following settings in YARN and MapReduce config.
-Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.CelebornMapOutputCollector
-Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer
```
**Note**: `MRAppMasterWithCeleborn` disables `yarn.app.mapreduce.am.job.recovery.enable` and sets `mapreduce.job.reduce.slowstart.completedmaps` to 1 by default.
**Note**: `MRAppMasterWithCeleborn` supports setting `mapreduce.celeborn.master.endpoints` via environment variable `CELEBORN_MASTER_ENDPOINTS`.
Meanwhile, `MRAppMasterWithCeleborn` disables `yarn.app.mapreduce.am.job.recovery.enable` and sets `mapreduce.job.reduce.slowstart.completedmaps` to 1 by default.

### Best Practice
If you want to set up a production-ready Celeborn cluster, your cluster should have at least 3 masters and at least 4 workers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
public class MRAppMasterWithCeleborn extends MRAppMaster {
private static final Logger logger = LoggerFactory.getLogger(MRAppMasterWithCeleborn.class);

private static final String MASTER_ENDPOINTS_ENV = "CELEBORN_MASTER_ENDPOINTS";

public MRAppMasterWithCeleborn(
ApplicationAttemptId applicationAttemptId,
ContainerId containerId,
Expand Down Expand Up @@ -124,8 +126,8 @@ private static String ensureGetSysEnv(String envName) throws IOException {
public static void main(String[] args) {
JobConf rmAppConf = new JobConf(new YarnConfiguration());
rmAppConf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
checkJobConf(rmAppConf);
try {
checkJobConf(rmAppConf);
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
String containerIdStr = ensureGetSysEnv(ApplicationConstants.Environment.CONTAINER_ID.name());
String nodeHostString = ensureGetSysEnv(ApplicationConstants.Environment.NM_HOST.name());
Expand Down Expand Up @@ -184,7 +186,7 @@ public static void main(String[] args) {
}
}

public static void checkJobConf(JobConf conf) {
public static void checkJobConf(JobConf conf) throws IOException {
if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false)) {
logger.warn("MRAppMaster disables job recovery.");
// MapReduce does not set the flag which indicates whether to keep containers across
Expand All @@ -197,5 +199,14 @@ public static void checkJobConf(JobConf conf) {
// Make sure reduces are scheduled only after all map are completed.
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
}
String masterEndpointsKey = HadoopUtils.MR_PREFIX + CelebornConf.MASTER_ENDPOINTS().key();
String masterEndpointsVal = conf.get(masterEndpointsKey);
if (masterEndpointsVal == null || masterEndpointsVal.isEmpty()) {
logger.info(
"MRAppMaster sets {} via environment variable {}.",
masterEndpointsKey,
MASTER_ENDPOINTS_ENV);
conf.set(masterEndpointsKey, ensureGetSysEnv(MASTER_ENDPOINTS_ENV));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.celeborn.common.CelebornConf;

public class HadoopUtils {
public static final String MR_PREFIX = "mapreduce.";
public static final String MR_CELEBORN_CONF = "celeborn.xml";
public static final String MR_CELEBORN_LM_HOST = "celeborn.lifecycleManager.host";
public static final String MR_CELEBORN_LM_PORT = "celeborn.lifecycleManager.port";
Expand All @@ -34,8 +35,8 @@ public static CelebornConf fromYarnConf(JobConf conf) {
for (Map.Entry<String, String> property : conf) {
String proName = property.getKey();
String proValue = property.getValue();
if (proName.startsWith("mapreduce.celeborn")) {
tmpCelebornConf.set(proName.substring("mapreduce.".length()), proValue);
if (proName.startsWith(MR_PREFIX + "celeborn")) {
tmpCelebornConf.set(proName.substring(MR_PREFIX.length()), proValue);
}
}
return tmpCelebornConf;
Expand Down
3 changes: 2 additions & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ cp $CELEBORN_HOME/mr/<Celeborn Client Jar> <yarn.application.classpath>
</property>
</configuration>
```
**Note**: `MRAppMasterWithCeleborn` disables `yarn.app.mapreduce.am.job.recovery.enable` and sets `mapreduce.job.reduce.slowstart.completedmaps` to 1 by default.
**Note**: `MRAppMasterWithCeleborn` supports setting `mapreduce.celeborn.master.endpoints` via environment variable `CELEBORN_MASTER_ENDPOINTS`.
Meanwhile, `MRAppMasterWithCeleborn` disables `yarn.app.mapreduce.am.job.recovery.enable` and sets `mapreduce.job.reduce.slowstart.completedmaps` to 1 by default.

Then deploy the example word count to the running cluster for verifying whether above configurations are correct.
```shell
Expand Down
3 changes: 2 additions & 1 deletion docs/deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,5 @@ Meanwhile, configure the following settings in YARN and MapReduce config.
-Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.CelebornMapOutputCollector
-Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer
```
**Note**: `MRAppMasterWithCeleborn` disables `yarn.app.mapreduce.am.job.recovery.enable` and sets `mapreduce.job.reduce.slowstart.completedmaps` to 1 by default.
**Note**: `MRAppMasterWithCeleborn` supports setting `mapreduce.celeborn.master.endpoints` via environment variable `CELEBORN_MASTER_ENDPOINTS`.
Meanwhile, `MRAppMasterWithCeleborn` disables `yarn.app.mapreduce.am.job.recovery.enable` and sets `mapreduce.job.reduce.slowstart.completedmaps` to 1 by default.

0 comments on commit c394fd4

Please sign in to comment.