Skip to content

Commit

Permalink
[CELEBORN-1093] Improve setup endpoint
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Avoid transform RpcEndpointAddress -> celebornUrl -> RpcEndpointAddress when setupEndpointRef.

### Why are the changes needed?
Ditto

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Cluster test.

Closes apache#2049 from onebox-li/improve-setup-endpoint.

Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
  • Loading branch information
onebox-li authored and waitinfuture committed Oct 31, 2023
1 parent e02cde0 commit 11fe324
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,23 @@ abstract class RpcEnv(conf: CelebornConf) {
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef

/**
* Retrieve the [[RpcEndpointRef]] represented by `uri` asynchronously.
* Retrieve the [[RpcEndpointRef]] represented by `addr` asynchronously.
*/
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
def asyncSetupEndpointRefByAddr(addr: RpcEndpointAddress): Future[RpcEndpointRef]

/**
* Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.
* Retrieve the [[RpcEndpointRef]] represented by `addr`. This is a blocking action.
*/
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
def setupEndpointRefByAddr(addr: RpcEndpointAddress): RpcEndpointRef = {
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByAddr(addr))
}

/**
* Retrieve the [[RpcEndpointRef]] represented by `address` and `endpointName`.
* This is a blocking action.
*/
def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)
setupEndpointRefByAddr(RpcEndpointAddress(address, endpointName))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,16 @@ class NettyRpcEnv(
}
}

def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
val addr = RpcEndpointAddress(uri)
val endpointRef = new NettyRpcEndpointRef(conf, addr, this)
def asyncSetupEndpointRefByAddr(addr: RpcEndpointAddress): Future[RpcEndpointRef] = {
val verifier = new NettyRpcEndpointRef(
conf,
RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME),
this)
verifier.ask[Boolean](RpcEndpointVerifier.CheckExistence(endpointRef.name)).flatMap { find =>
verifier.ask[Boolean](RpcEndpointVerifier.CheckExistence(addr.name)).flatMap { find =>
if (find) {
Future.successful(endpointRef)
Future.successful(new NettyRpcEndpointRef(conf, addr, this))
} else {
Future.failed(new RpcEndpointNotFoundException(uri))
Future.failed(new RpcEndpointNotFoundException(addr.toString))
}
}(ThreadUtils.sameThread)
}
Expand Down

0 comments on commit 11fe324

Please sign in to comment.