Skip to content

Commit

Permalink
[CELEBORN-1664] Fix secret fetch failures after LEADER master failover
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Fix a bug related to auth under master HA mode which would cause app failures when leader master restarts. Also, remove the secrets from memory after app lost.

Previous implementation add the registration & secret info in leader Master's memory, and push to other masters though apache#2346. After leader restarts, the info will only be in Ratis (AbstractMetaManager), however app still fetch it from new leader's memory, and would fail to get it.

Fix this by checking AbstractMetaManager's registration info if not found in memory, and properly authorize the app.

### Why are the changes needed?
When auth enabled, and leader master restart, there will be "Registration information not found" error on app side, and failed to send heartbeat to master. It will cause app to be removed on server side after heartbeat timeout, causing job to fail.
```
24/10/14 01:56:55 ERROR [celeborn-netty-rpc-connection-executor-3] client.TransportClientFactory: Exception while bootstrapping client after 71.4 ms
java.lang.RuntimeException: java.io.IOException: Exception in sendRpcSync to: celeborn-moka-test-manager-3/{ip}:9097
    at org.apache.celeborn.common.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:110)
    at org.apache.celeborn.common.network.sasl.registration.RegistrationClientBootstrap.doSaslBootstrap(RegistrationClientBootstrap.java:228)
    at org.apache.celeborn.common.network.sasl.registration.RegistrationClientBootstrap.doBootstrap(RegistrationClientBootstrap.java:103)
    at org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:307)
    at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:205)
    at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:133)
    at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:212)
    at org.apache.celeborn.common.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:232)
    at org.apache.celeborn.common.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
    at org.apache.celeborn.common.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Exception in sendRpcSync to: celeborn-moka-test-manager-3/{ip}:9097
    at org.apache.celeborn.common.network.client.TransportClient.sendRpcSync(TransportClient.java:324)
    at org.apache.celeborn.common.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:95)
    ... 13 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: java.lang.RuntimeException: Registration information not found for spark-402a80be70f74455b01
    at org.apache.celeborn.common.network.sasl.CelebornSaslServer$DigestCallbackHandler.handle(CelebornSaslServer.java:142)
    at java.security.sasl/com.sun.security.sasl.digest.DigestMD5Server.validateClientResponse(DigestMD5Server.java:589)
    at java.security.sasl/com.sun.security.sasl.digest.DigestMD5Server.evaluateResponse(DigestMD5Server.java:244)
    at org.apache.celeborn.common.network.sasl.CelebornSaslServer.response(CelebornSaslServer.java:84)
    at org.apache.celeborn.common.network.sasl.SaslRpcHandler.doAuthChallenge(SaslRpcHandler.java:99)
    at org.apache.celeborn.common.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:58)
    at org.apache.celeborn.common.network.sasl.registration.RegistrationRpcHandler.processRpcMessage(RegistrationRpcHandler.java:175)
```
### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested on dev cluster and job can properly get the secrets after master failover

Closes apache#2826 from YutingWang98/fix_auth_master_ha.

Authored-by: YutingWang98 <69848459+YutingWang98@users.noreply.github.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
YutingWang98 authored and Mridul Muralidharan committed Oct 22, 2024
1 parent 06bd39b commit 5b20300
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,48 @@

import org.apache.celeborn.common.meta.ApplicationMeta;
import org.apache.celeborn.common.network.sasl.SecretRegistry;
import org.apache.celeborn.common.network.sasl.SecretRegistryImpl;
import org.apache.celeborn.service.deploy.master.clustermeta.IMetadataHandler;
import org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager;

/**
* A simple implementation of {@link SecretRegistry} that stores secrets in memory and Ratis. This
* persists an application secret in Ratis but the deletion of that secret happens when
* ApplicationLost is triggered.
* A simple implementation of {@link SecretRegistry} that stores secrets in Ratis. This persists an
* application secret in Ratis but the deletion of that secret happens when ApplicationLost is
* triggered.
*/
public class MasterSecretRegistryImpl extends SecretRegistryImpl {
public class MasterSecretRegistryImpl implements SecretRegistry {

private static final Logger LOG = LoggerFactory.getLogger(MasterSecretRegistryImpl.class);
private IMetadataHandler metadataHandler;
private AbstractMetaManager statusSystem;

@Override
public void register(String appId, String secret) {
super.register(appId, secret);
if (metadataHandler != null) {
LOG.info("Persisting metadata for appId: {}", appId);
metadataHandler.handleApplicationMeta(new ApplicationMeta(appId, secret));
LOG.info("Persisting metadata for appId: {}", appId);
statusSystem.handleApplicationMeta(new ApplicationMeta(appId, secret));
}

@Override
public void unregister(String appId) {
LOG.info("Removing metadata for appId: {}", appId);
statusSystem.removeApplicationMeta(appId);
}

@Override
public String getSecretKey(String appId) {
String secret = null;
LOG.debug("Fetching secret from metadata manager for appId: {}", appId);
ApplicationMeta applicationMeta = statusSystem.applicationMetas.get(appId);
if (applicationMeta != null) {
secret = applicationMeta.secret();
}
return secret;
}

@Override
public boolean isRegistered(String appId) {
LOG.info("Fetching registration status from metadata manager for appId: {}", appId);
return statusSystem.applicationMetas.containsKey(appId);
}

void setMetadataHandler(IMetadataHandler metadataHandler) {
this.metadataHandler = metadataHandler;
void setMetadataHandler(AbstractMetaManager statusSystem) {
this.statusSystem = statusSystem;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,10 @@ public void updateApplicationMeta(ApplicationMeta applicationMeta) {
applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta);
}

public void removeApplicationMeta(String appId) {
applicationMetas.remove(appId);
}

public int registeredShuffleCount() {
return registeredAppAndShuffles.values().stream().mapToInt(Set::size).sum();
}
Expand Down

0 comments on commit 5b20300

Please sign in to comment.