Skip to content

Commit

Permalink
[CELEBORN-1097] Optimize the retrieval of configuration in the intern…
Browse files Browse the repository at this point in the history
…alCreateClient

### What changes were proposed in this pull request?
Optimize the retrieval of configuration in the internalCreateClient

### Why are the changes needed?
Directly accessing configuration information through 'conf.xx' in 'internalCreateClient' is time-consuming.
![image](https://github.com/apache/incubator-celeborn/assets/107825064/315a5013-5dfb-4a44-bf1b-109fc4ecc654)

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes apache#2055 from kerwin-zk/client-factory-conf.

Authored-by: xiyu.zk <xiyu.zk@alibaba-inc.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
  • Loading branch information
kerwin-zk authored and cfmcgrady committed Oct 31, 2023
1 parent 70366ed commit cf194a5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public FlinkTransportClientFactory(TransportContext context, int fetchMaxRetries
super(context);
bufferSuppliers = JavaUtils.newConcurrentHashMap();
this.fetchMaxRetries = fetchMaxRetries;
this.pooledAllocator = new UnpooledByteBufAllocator(true);
}

public TransportClient createClientWithRetry(String remoteHost, int remotePort)
Expand Down Expand Up @@ -75,11 +76,6 @@ public TransportClient createClient(String remoteHost, int remotePort)
remoteHost, remotePort, -1, new TransportFrameDecoderWithBufferSupplier(bufferSuppliers));
}

@Override
protected void initializeMemoryAllocator() {
this.pooledAllocator = new UnpooledByteBufAllocator(true);
}

public void registerSupplier(long streamId, Supplier<ByteBuf> supplier) {
bufferSuppliers.put(streamId, supplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,34 +68,37 @@ private static class ClientPool {
private static final Logger logger = LoggerFactory.getLogger(TransportClientFactory.class);

private final TransportContext context;
private final TransportConf conf;
private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;

/** Random number generator for picking connections between peers. */
private final Random rand;

private final int numConnectionsPerPeer;

private final int connectTimeoutMs;

private final int receiveBuf;

private final int sendBuf;
private final Class<? extends Channel> socketChannelClass;
private EventLoopGroup workerGroup;
protected ByteBufAllocator pooledAllocator;

public TransportClientFactory(TransportContext context) {
this.context = Preconditions.checkNotNull(context);
this.conf = context.getConf();
TransportConf conf = context.getConf();
this.connectionPool = JavaUtils.newConcurrentHashMap();
this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
this.connectTimeoutMs = conf.connectTimeoutMs();
this.receiveBuf = conf.receiveBuf();
this.sendBuf = conf.sendBuf();
this.rand = new Random();

IOMode ioMode = IOMode.valueOf(conf.ioMode());
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
logger.info("mode " + ioMode + " threads " + conf.clientThreads());
this.workerGroup =
NettyUtils.createEventLoop(ioMode, conf.clientThreads(), conf.getModuleName() + "-client");
initializeMemoryAllocator();
}

protected void initializeMemoryAllocator() {
this.pooledAllocator = NettyUtils.getPooledByteBufAllocator(conf, null, false);
}

Expand Down Expand Up @@ -213,15 +216,15 @@ private TransportClient internalCreateClient(
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectTimeoutMs())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
.option(ChannelOption.ALLOCATOR, pooledAllocator);

if (conf.receiveBuf() > 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());
if (receiveBuf > 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, receiveBuf);
}

if (conf.sendBuf() > 0) {
bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf());
if (sendBuf > 0) {
bootstrap.option(ChannelOption.SO_SNDBUF, sendBuf);
}

final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
Expand All @@ -239,9 +242,9 @@ public void initChannel(SocketChannel ch) {

// Connect to the remote server
ChannelFuture cf = bootstrap.connect(address);
if (!cf.await(conf.connectTimeoutMs())) {
if (!cf.await(connectTimeoutMs)) {
throw new CelebornIOException(
String.format("Connecting to %s timed out (%s ms)", address, conf.connectTimeoutMs()));
String.format("Connecting to %s timed out (%s ms)", address, connectTimeoutMs));
} else if (cf.cause() != null) {
throw new CelebornIOException(String.format("Failed to connect to %s", address), cf.cause());
}
Expand Down

0 comments on commit cf194a5

Please sign in to comment.