Skip to content

Commit

Permalink
[CELEBORN-1513] Support wildcard bind in dual stack environments
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Support wildcard bind for RPC and HTTP servers. When wildcard address is used, the service is able to listen to both ipv4 and ipv6 traffic in dual-stack environments.

The specific scenario where this becomes relevant is as follows:

If some of the compute infrastructure is IPv4 only, some v6 only and others dual stack - the way we can have a single Celeborn infra to cater to all is by:
a) Set bind.preferip to false - so that advertised address is the host and not IP.

b) bind to wild card address

With both in place, the v4 only compute nodes will resolve the v4 address and connect to v4 ip/port.
Likewise, for v6 only.
Dual stack compute nodes will use prefer ipv6 Java flag to resolve to either v4 or v6.

This is how we are handling the combination of scenarios internally.

### Why are the changes needed?
see above.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested on a server using netstat, and tried connecting to via `nc -4` and `nc -6` to ensure connection was there.

Closes apache#2713 from akpatnam25/CELEBORN-1513-fix.

Authored-by: Aravind Patnam <apatnam@linkedin.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
  • Loading branch information
Aravind Patnam authored and FMX committed Sep 6, 2024
1 parent 9ec345b commit f06dba1
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ public class TransportModuleConstants {
@Deprecated public static final String RPC_MODULE = "rpc";

public static final String DATA_MODULE = "data";

public static final String WILDCARD_BIND_ADDRESS = null;
}
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
// Network //
// //////////////////////////////////////////////////////
def bindPreferIP: Boolean = get(NETWORK_BIND_PREFER_IP)
def advertisePreferIP: Boolean = get(NETWORK_ADVERTISE_PREFER_IP)
def bindWildcardAddress: Boolean = get(NETWORK_WILDCARD_ADDRESS_BIND)
def portMaxRetries: Int = get(PORT_MAX_RETRY)
def networkTimeout: RpcTimeout =
new RpcTimeout(get(NETWORK_TIMEOUT).milli, NETWORK_TIMEOUT.key)
Expand Down Expand Up @@ -675,6 +677,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se

def masterHost: String = get(MASTER_HOST).replace("<localhost>", Utils.localHostName(this))

def advertiseAddressMasterHost: String = get(MASTER_HOST)
.replace("<localhost>", Utils.getHostName(this.advertisePreferIP))

def masterHttpHost: String =
get(MASTER_HTTP_HOST).replace("<localhost>", Utils.localHostName(this))

Expand Down Expand Up @@ -1725,6 +1730,23 @@ object CelebornConf extends Logging {
.intConf
.createOptional

val NETWORK_WILDCARD_ADDRESS_BIND: ConfigEntry[Boolean] =
buildConf("celeborn.network.bind.wildcardAddress")
.categories("network")
.version("0.6.0")
.doc("When `true`, the bind address will be set to a wildcard address, while the advertise address will " +
"remain as whatever is set by `celeborn.network.advertise.preferIpAddress`. This is helpful in dual-stack " +
"environments, where the service must listen to both IPv4 and IPv6 clients.")
.booleanConf
.createWithDefault(false)

val NETWORK_ADVERTISE_PREFER_IP: ConfigEntry[Boolean] =
buildConf("celeborn.network.advertise.preferIpAddress")
.categories("network")
.version("0.6.0")
.doc("When `true`, prefer to use IP address, otherwise FQDN for advertise address.")
.fallbackConf(NETWORK_BIND_PREFER_IP)

val NETWORK_MEMORY_ALLOCATOR_VERBOSE_METRIC: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.verbose.metric")
.categories("network")
Expand Down
25 changes: 25 additions & 0 deletions common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.metrics.source.AbstractSource
import org.apache.celeborn.common.protocol.TransportModuleConstants
import org.apache.celeborn.common.rpc.netty.NettyRpcEnvFactory
import org.apache.celeborn.common.util.Utils

/**
* A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor
Expand All @@ -42,6 +43,30 @@ object RpcEnv {
create(name, transportModule, host, host, port, conf, 0, securityContext)
}

def create(
name: String,
transportModule: String,
host: String,
port: Int,
conf: CelebornConf,
numUsableCores: Int,
securityContext: Option[RpcSecurityContext],
source: Option[AbstractSource]): RpcEnv = {
val bindAddress =
if (conf.bindWildcardAddress) TransportModuleConstants.WILDCARD_BIND_ADDRESS else host
val advertiseAddress = Utils.localHostNameForAdvertiseAddress(conf, name)
create(
name,
transportModule,
bindAddress,
advertiseAddress,
port,
conf,
numUsableCores,
securityContext,
source)
}

def create(
name: String,
transportModule: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,13 @@ private[celeborn] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
new JavaSerializer(celebornConf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv = new NettyRpcEnv(config, javaSerializerInstance)
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
logInfo(s"Starting RPC Server [${config.name}] on ${config.bindAddress}:$actualPort " +
s"with advisor endpoint ${config.advertiseAddress}:$actualPort")
if (celebornConf.bindWildcardAddress) {
logInfo(s"Starting RPC Server [${config.name}] on wildcard address with port" +
s" $actualPort, and advertised endpoint ${config.advertiseAddress}:$actualPort")
} else {
logInfo(s"Starting RPC Server [${config.name}] on ${config.bindAddress}:$actualPort " +
s"with advertised endpoint ${config.advertiseAddress}:$actualPort")
}
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
Expand Down
27 changes: 24 additions & 3 deletions common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskStatus, WorkerInfo}
import org.apache.celeborn.common.network.protocol.TransportMessage
import org.apache.celeborn.common.network.util.TransportConf
import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, TransportModuleConstants}
import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, RpcNameConstants, TransportModuleConstants}
import org.apache.celeborn.common.protocol.message.{ControlMessages, Message, StatusCode}
import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
import org.apache.celeborn.reflect.DynConstructors
Expand Down Expand Up @@ -415,8 +415,29 @@ object Utils extends Logging {
customHostname = Some(hostname)
}

def localHostName(conf: CelebornConf): String = customHostname.getOrElse {
if (conf.bindPreferIP) {
def localHostName(conf: CelebornConf): String = {
getHostName(conf.bindPreferIP)
}

def localHostNameForAdvertiseAddress(conf: CelebornConf, env: String): String = {
if (env.equals(RpcNameConstants.MASTER_SYS) || env.equals(
RpcNameConstants.MASTER_INTERNAL_SYS)) {
getAdvertiseAddressForMaster(conf)
} else {
getAdvertiseAddressForWorker(conf)
}
}

private def getAdvertiseAddressForMaster(conf: CelebornConf) = {
conf.advertiseAddressMasterHost
}

private def getAdvertiseAddressForWorker(conf: CelebornConf) = {
getHostName(conf.advertisePreferIP)
}

def getHostName(preferIP: Boolean): String = customHostname.getOrElse {
if (preferIP) {
localIpAddress match {
case ipv6Address: Inet6Address =>
val ip = ipv6Address.getHostAddress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,11 @@ class WorkerInfoSuite extends CelebornFunSuite {
"mockEnv",
TransportModuleConstants.RPC_SERVICE_MODULE,
"localhost",
"localhost",
12345,
conf,
64)
64,
None,
None)
val worker4 = new WorkerInfo(
"h4",
40001,
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration/network.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ license: |
| celeborn.&lt;module&gt;.push.timeoutCheck.threads | 4 | false | Threads num for checking push data timeout. If setting <module> to `data`, it works for shuffle client push data. If setting <module> to `push`, it works for Flink shuffle client push data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker. | 0.3.0 | |
| celeborn.&lt;role&gt;.rpc.dispatcher.threads | &lt;value of celeborn.rpc.dispatcher.threads&gt; | false | Threads number of message dispatcher event loop for roles | | |
| celeborn.io.maxDefaultNettyThreads | 64 | false | Max default netty threads | 0.3.2 | |
| celeborn.network.advertise.preferIpAddress | &lt;value of celeborn.network.bind.preferIpAddress&gt; | false | When `true`, prefer to use IP address, otherwise FQDN for advertise address. | 0.6.0 | |
| celeborn.network.bind.preferIpAddress | true | false | When `true`, prefer to use IP address, otherwise FQDN. This configuration only takes effects when the bind hostname is not set explicitly, in such case, Celeborn will find the first non-loopback address to bind. | 0.3.0 | |
| celeborn.network.bind.wildcardAddress | false | false | When `true`, the bind address will be set to a wildcard address, while the advertise address will remain as whatever is set by `celeborn.network.advertise.preferIpAddress`. This is helpful in dual-stack environments, where the service must listen to both IPv4 and IPv6 clients. | 0.6.0 | |
| celeborn.network.connect.timeout | 10s | false | Default socket connect timeout. | 0.2.0 | |
| celeborn.network.memory.allocator.numArenas | &lt;undefined&gt; | false | Number of arenas for pooled memory allocator. Default value is Runtime.getRuntime.availableProcessors, min value is 2. | 0.3.0 | |
| celeborn.network.memory.allocator.verbose.metric | false | false | Whether to enable verbose metric for pooled allocator. | 0.3.0 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ private[celeborn] class Master(
RpcNameConstants.MASTER_SYS,
TransportModuleConstants.RPC_SERVICE_MODULE,
masterArgs.host,
masterArgs.host,
masterArgs.port,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()))
Math.max(64, Runtime.getRuntime.availableProcessors()),
None,
None)
} else {
val externalSecurityContext = new RpcSecurityContextBuilder()
.withServerSaslContext(
Expand All @@ -112,11 +113,11 @@ private[celeborn] class Master(
RpcNameConstants.MASTER_SYS,
TransportModuleConstants.RPC_SERVICE_MODULE,
masterArgs.host,
masterArgs.host,
masterArgs.port,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()),
Some(externalSecurityContext))
Some(externalSecurityContext),
None)
}

// Visible for testing
Expand All @@ -130,10 +131,11 @@ private[celeborn] class Master(
RpcNameConstants.MASTER_INTERNAL_SYS,
TransportModuleConstants.RPC_SERVICE_MODULE,
masterArgs.host,
masterArgs.host,
masterArgs.internalPort,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()))
Math.max(64, Runtime.getRuntime.availableProcessors()),
None,
None)
}

private val rackResolver = new CelebornRackResolver(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.eclipse.jetty.servlet.FilterHolder
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.WorkerInfo
import org.apache.celeborn.common.protocol.TransportModuleConstants
import org.apache.celeborn.common.util.Utils
import org.apache.celeborn.server.common.http.HttpServer
import org.apache.celeborn.server.common.http.api.ApiRootResource
Expand Down Expand Up @@ -210,11 +211,15 @@ abstract class HttpService extends Service with Logging {
}

private def httpHost(): String = {
serviceName match {
case Service.MASTER =>
conf.masterHttpHost
case Service.WORKER =>
conf.workerHttpHost
if (conf.bindWildcardAddress) {
TransportModuleConstants.WILDCARD_BIND_ADDRESS
} else {
serviceName match {
case Service.MASTER =>
conf.masterHttpHost
case Service.WORKER =>
conf.workerHttpHost
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ private[celeborn] class Worker(
RpcNameConstants.WORKER_SYS,
TransportModuleConstants.RPC_SERVICE_MODULE,
workerArgs.host,
workerArgs.host,
workerArgs.port,
conf,
Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())),
Expand All @@ -118,7 +117,6 @@ private[celeborn] class Worker(
RpcNameConstants.WORKER_SYS,
TransportModuleConstants.RPC_SERVICE_MODULE,
workerArgs.host,
workerArgs.host,
workerArgs.port,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()),
Expand All @@ -134,7 +132,6 @@ private[celeborn] class Worker(
RpcNameConstants.WORKER_INTERNAL_SYS,
TransportModuleConstants.RPC_SERVICE_MODULE,
workerArgs.host,
workerArgs.host,
workerArgs.internalPort,
conf,
Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())),
Expand Down

0 comments on commit f06dba1

Please sign in to comment.