Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Unix domain socket support #4846

Merged
merged 34 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1d3f630
Add server-side Unix domain socket support
trustin May 19, 2023
41ee453
Address the comments from @ikhoon
trustin May 20, 2023
999eca6
Domain socket support on macOS
trustin May 20, 2023
d473068
Lint
trustin May 22, 2023
6ea5bee
Add `Endpoint.toSocketAddress()` and `DomainSocketAddress.asEndpoint()`
trustin May 22, 2023
20c6709
Make `Endpoint` cache `InetSocketAddress` more aggressively / Addres…
trustin May 23, 2023
a5aa7fe
Simplify SessionProtocolNegotiationCache key generation
trustin May 23, 2023
2ac54ad
Update core/src/main/java/com/linecorp/armeria/client/Endpoint.java
trustin May 25, 2023
4b4c98c
Update core/src/main/java/com/linecorp/armeria/client/HttpClientFacto…
trustin May 25, 2023
73f5d22
Address the comments from @jrhee17 / Normalize IP addresses wherever …
trustin May 25, 2023
a4b24f6
Merge branch 'main' into uds
trustin May 25, 2023
c604c9c
Merge branch 'main' into uds
trustin May 30, 2023
ff89907
Merge branch 'main' into uds
trustin May 30, 2023
4f1d5fb
Fix an exception in `DefaultEventLoopScheduler`
trustin May 30, 2023
3f5bc3e
Don't throw exception when setting port/IP on domain socket endpoint
trustin May 30, 2023
9c48404
Make a domain socket endpoint always has predefined IP address and port
trustin May 30, 2023
1d2b297
Fix Endpoint.authority()
trustin May 30, 2023
6733934
Fix a bug in Endpoint.withHost()
trustin May 30, 2023
92ebac1
Address the comment from @ikhoon / Add `Endpoint.{with,without}Defaul…
trustin May 30, 2023
54aee42
Update core/src/main/java/com/linecorp/armeria/client/Endpoint.java
ikhoon Jun 3, 2023
324d470
Merge branch 'main' into pr-4846-trustin-uds
ikhoon Jun 3, 2023
37cbac2
resolve conflicts
ikhoon Jun 3, 2023
ee13b32
Remove an unused import
ikhoon Jun 3, 2023
80e7329
Merge branch 'main' into uds
ikhoon Jun 7, 2023
a4bcd25
Merge branch 'main' into uds
trustin Jun 7, 2023
5939891
Update core/src/main/java/com/linecorp/armeria/client/Endpoint.java
trustin Jun 7, 2023
22f637d
Update core/src/main/java/com/linecorp/armeria/client/Endpoint.java
trustin Jun 7, 2023
0e7bc01
Update core/src/main/java/com/linecorp/armeria/client/Endpoint.java
trustin Jun 7, 2023
fb84c46
Add `unix:` prefix to the boss thread names
trustin Jun 7, 2023
7834c7d
Update core/src/main/java/com/linecorp/armeria/client/HttpChannelPool…
trustin Jun 7, 2023
ac354c1
Address the comment by @jrhee17
trustin Jun 7, 2023
1ffd2ec
Lint
trustin Jun 7, 2023
168e71c
Oops
trustin Jun 8, 2023
4de1794
Lint
trustin Jun 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make Endpoint cache InetSocketAddress more aggressively / Address…
… the comments from @ikhoon
  • Loading branch information
trustin committed May 23, 2023
commit 20c6709542de32b5b5878ebb1ddb07b62dc05c39
38 changes: 30 additions & 8 deletions core/src/main/java/com/linecorp/armeria/client/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.linecorp.armeria.client;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

import java.io.UnsupportedEncodingException;
Expand All @@ -30,7 +31,6 @@
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -161,17 +161,20 @@ public static Endpoint of(SocketAddress addr) {
}

if (addr instanceof DomainSocketAddress) {
return of(((DomainSocketAddress) addr).authority());
final DomainSocketAddress domainSocketAddr = (DomainSocketAddress) addr;
final Endpoint endpoint = unsafeCreate(domainSocketAddr.authority(), 0);
endpoint.socketAddress = domainSocketAddr;
return endpoint;
}

checkArgument(addr instanceof InetSocketAddress,
"unsupported address: %s", addr);

final InetSocketAddress inetAddr = (InetSocketAddress) addr;
@SuppressWarnings("resource")
final Endpoint endpoint = of(inetAddr.getHostString(), inetAddr.getPort());
return inetAddr.isUnresolved() ? endpoint
: endpoint.withIpAddr(inetAddr.getAddress().getHostAddress());
final String ipAddr = inetAddr.isUnresolved() ? null : inetAddr.getAddress().getHostAddress();
final Endpoint endpoint = of(inetAddr.getHostString(), inetAddr.getPort()).withIpAddr(ipAddr);
endpoint.socketAddress = inetAddr;
return endpoint;
}

/**
Expand Down Expand Up @@ -239,6 +242,8 @@ private enum HostType {
private CompletableFuture<Endpoint> selectFuture;
@Nullable
private CompletableFuture<List<Endpoint>> whenReadyFuture;
@Nullable
private InetSocketAddress socketAddress;
private int hashCode;

private Endpoint(String host, @Nullable String ipAddr, int port, int weight, HostType hostType,
Expand Down Expand Up @@ -365,7 +370,7 @@ public Endpoint withHost(String host) {
/**
* Returns the IP address of this endpoint.
*
* @return the IP address, or {@code null} if the host name is not resolved yet
* @return the IP address, or {@code null} if the host name is a domain socket address or not resolved yet
*/
@Nullable
public String ipAddr() {
Expand Down Expand Up @@ -395,7 +400,7 @@ public boolean isIpAddrOnly() {
* Returns the {@link StandardProtocolFamily} of this endpoint's IP address.
*
* @return the {@link StandardProtocolFamily} of this endpoint's IP address, or
* {@code null} if the host name is not resolved yet
* {@code null} if the host name is a domain socket address or not resolved yet.
*/
@Nullable
public StandardProtocolFamily ipFamily() {
Expand All @@ -416,6 +421,7 @@ public StandardProtocolFamily ipFamily() {
*/
@UnstableApi
public boolean isDomainSocket() {
trustin marked this conversation as resolved.
Show resolved Hide resolved

trustin marked this conversation as resolved.
Show resolved Hide resolved
return isDomainSocketAuthority(host);
}

Expand Down Expand Up @@ -535,6 +541,8 @@ public Endpoint withIpAddr(@Nullable String ipAddr) {
return this;
}

checkState(!isDomainSocket(), "A domain socket endpoint can't have an IP address.");

if (NetUtil.isValidIpV4Address(ipAddr)) {
return withIpAddr(ipAddr, StandardProtocolFamily.INET);
}
Expand Down Expand Up @@ -799,7 +807,21 @@ public URI toUri(Scheme scheme, @Nullable String path) {
* @see #hasPort()
* @see #isDomainSocket()
*/
@UnstableApi
public InetSocketAddress toSocketAddress(int defaultPort) {
trustin marked this conversation as resolved.
Show resolved Hide resolved
final InetSocketAddress socketAddress = this.socketAddress;
if (socketAddress != null) {
return socketAddress;
}

final InetSocketAddress newSocketAddress = toSocketAddress0(defaultPort);
if (hasPort() || isDomainSocket()) {
this.socketAddress = newSocketAddress;
}
return newSocketAddress;
}

private InetSocketAddress toSocketAddress0(int defaultPort) {
if (isDomainSocket()) {
final String decodedHost;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
*/
package com.linecorp.armeria.client;

import java.io.UnsupportedEncodingException;
import java.lang.reflect.Array;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URLDecoder;
import java.net.UnknownHostException;
import java.nio.channels.UnsupportedAddressTypeException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -73,7 +69,6 @@
import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import reactor.core.scheduler.NonBlocking;
Expand Down Expand Up @@ -381,7 +376,7 @@ private void connect(SessionProtocol desiredProtocol, PoolKey key, ChannelAcquis
timingsBuilder.socketConnectStart();

// Fail immediately if it is certain that the remote address doesn't support the desired protocol.
final SocketAddress remoteAddress = key.remoteAddress;
final SocketAddress remoteAddress = key.toRemoteAddress();
if (SessionProtocolNegotiationCache.isUnsupported(remoteAddress, desiredProtocol)) {
notifyConnect(desiredProtocol, key,
eventLoop.newFailedFuture(
Expand Down Expand Up @@ -642,73 +637,24 @@ public void close() {
static final class PoolKey {
final Endpoint endpoint;
final ProxyConfig proxyConfig;
final SocketAddress remoteAddress;

private final int hashCode;
private final String strVal;

PoolKey(Endpoint endpoint, ProxyConfig proxyConfig) {
this.endpoint = endpoint;
this.proxyConfig = proxyConfig;
remoteAddress = toRemoteAddress(endpoint, proxyConfig);
hashCode = endpoint.hashCode() * 31 + proxyConfig.hashCode();
strVal = generateString(endpoint, proxyConfig);
}

private static String generateString(Endpoint endpoint, ProxyConfig proxyConfig) {
final String host = endpoint.host();
final String ipAddr = endpoint.ipAddr();
final int port = endpoint.port();
final boolean isDomainSocket = endpoint.isDomainSocket();
final String proxyConfigStr = proxyConfig.proxyType() != ProxyType.DIRECT ? proxyConfig.toString()
: null;
try (TemporaryThreadLocals tempThreadLocals = TemporaryThreadLocals.acquire()) {
final StringBuilder buf = tempThreadLocals.stringBuilder();
buf.append('{').append(host);
if (ipAddr != null) {
buf.append('/').append(ipAddr);
}
if (!isDomainSocket) {
buf.append(':').append(port);
}
if (proxyConfigStr != null) {
buf.append(" via ");
buf.append(proxyConfigStr);
}
buf.append('}');
return buf.toString();
}
}

private static SocketAddress toRemoteAddress(Endpoint endpoint, ProxyConfig proxyConfig) {
final String host = endpoint.host();
final String ipAddr = endpoint.ipAddr();
if (ipAddr != null) {
try {
return new InetSocketAddress(
InetAddress.getByAddress(host, NetUtil.createByteArrayFromIpAddressString(ipAddr)),
endpoint.port());
} catch (UnknownHostException e) {
// Should never reach here because `Endpoint` validates the IP address.
throw new Error(e);
}
SocketAddress toRemoteAddress() {
final InetSocketAddress remoteAddr = endpoint.toSocketAddress(-1);
if (remoteAddr instanceof com.linecorp.armeria.common.util.DomainSocketAddress) {
return ((com.linecorp.armeria.common.util.DomainSocketAddress) remoteAddr).asNettyAddress();
}

// ipAddr can be null for domain sockets.
if (endpoint.isDomainSocket()) {
final String path;
try {
path = URLDecoder.decode(host.substring(7), "UTF-8");
} catch (UnsupportedEncodingException e) {
// Should never reach here.
throw new Error(e);
}
return new DomainSocketAddress(path);
}
assert !remoteAddr.isUnresolved() || proxyConfig.proxyType().isForwardProxy()
: remoteAddr + ", " + proxyConfig;

// ipAddr can be null for forward proxies.
assert proxyConfig.proxyType().isForwardProxy() : proxyConfig;
return InetSocketAddress.createUnresolved(host, endpoint.port());
return remoteAddr;
}

@Override
Expand All @@ -734,7 +680,28 @@ public int hashCode() {

@Override
public String toString() {
return strVal;
final String host = endpoint.host();
final String ipAddr = endpoint.ipAddr();
final int port = endpoint.port();
final boolean isDomainSocket = endpoint.isDomainSocket();
final String proxyConfigStr = proxyConfig.proxyType() != ProxyType.DIRECT ? proxyConfig.toString()
: null;
try (TemporaryThreadLocals tempThreadLocals = TemporaryThreadLocals.acquire()) {
final StringBuilder buf = tempThreadLocals.stringBuilder();
buf.append('{').append(host);
if (ipAddr != null) {
buf.append('/').append(ipAddr);
}
if (!isDomainSocket) {
buf.append(':').append(port);
}
if (proxyConfigStr != null) {
buf.append(" via ");
buf.append(proxyConfigStr);
}
buf.append('}');
return buf.toString();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public static boolean isDomainSocketAddress(InetAddress addr) {
private String authority;
@Nullable
private Endpoint endpoint;
@Nullable
@SuppressWarnings("NullableOnContainingClass") // ErrorProne false positive
private io.netty.channel.unix.DomainSocketAddress nettyAddress;

private DomainSocketAddress(Path path) {
super(toInetAddress(path), 1);
Expand Down Expand Up @@ -132,8 +135,16 @@ public String authority() {
*
* @return the converted Netty address
*/
public io.netty.channel.unix.DomainSocketAddress toNettyAddress() {
return new io.netty.channel.unix.DomainSocketAddress(path.toFile());
public io.netty.channel.unix.DomainSocketAddress asNettyAddress() {
final io.netty.channel.unix.DomainSocketAddress nettyAddress = this.nettyAddress;
if (nettyAddress != null) {
return nettyAddress;
}

final io.netty.channel.unix.DomainSocketAddress newNettyAddress =
new io.netty.channel.unix.DomainSocketAddress(path.toFile());
this.nettyAddress = newNettyAddress;
return newNettyAddress;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ private ChannelFuture doStart(ServerPort port) {
if (port.isDomainSocket()) {
if (transportType.supportsDomainSockets()) {
// Convert to Netty's DomainSocketAddress type.
localAddress = ((DomainSocketAddress) port.localAddress()).toNettyAddress();
localAddress = ((DomainSocketAddress) port.localAddress()).asNettyAddress();
channelType = transportType.domainServerChannelType();
} else {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,45 @@ void conversionToSocketAddress() throws Exception {
DomainSocketAddress.of(Paths.get("/foo.sock")));
}

@Test
void socketAddressCache() {
final Endpoint endpointWithPort = Endpoint.of("foo", 42);
assertThat(endpointWithPort.toSocketAddress(-1))
.isSameAs(endpointWithPort.toSocketAddress(1));

final Endpoint endpointWithoutPort = Endpoint.of("foo");
assertThat(endpointWithoutPort.toSocketAddress(80))
.isNotSameAs(endpointWithoutPort.toSocketAddress(80));

final Endpoint endpointWithDomainSocket = Endpoint.of("unix%3A%2Ffoo.sock");
assertThat(endpointWithDomainSocket.toSocketAddress(-1))
.isSameAs(endpointWithDomainSocket.toSocketAddress(1));
}

@Test
void socketAddressPrecache() throws Exception {
final DomainSocketAddress domainSocketAddress = DomainSocketAddress.of(Paths.get("/foo.sock"));
assertThat(Endpoint.of(domainSocketAddress).toSocketAddress(-1))
.isSameAs(domainSocketAddress);

final InetSocketAddress unresolvedSocketAddress = InetSocketAddress.createUnresolved("foo", 42);
assertThat(Endpoint.of(unresolvedSocketAddress).toSocketAddress(-1))
.isSameAs(unresolvedSocketAddress);

final InetSocketAddress resolvedSocketAddress = new InetSocketAddress(
InetAddress.getByAddress("foo", new byte[] { 127, 0, 0, 1 }), 42);
assertThat(Endpoint.of(resolvedSocketAddress).toSocketAddress(-1))
.isSameAs(resolvedSocketAddress);
}

@Test
void domainSocketWithIpAddr() {
final Endpoint endpoint = Endpoint.of("unix%3A%2Ffoo.sock");
assertThatThrownBy(() -> endpoint.withIpAddr("127.0.0.1"))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("domain socket");
}

@Test
void setAndGetAttr() {
final Endpoint endpointA = Endpoint.parse("a");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,10 @@ void domainKeyGeneration() {
}

private static InetSocketAddress toRemoteAddress(Endpoint endpoint) throws UnknownHostException {
final String ipAddr;
if (endpoint.hasIpAddr()) {
ipAddr = endpoint.ipAddr();
assert ipAddr != null;
} else {
ipAddr = "127.0.0.1"; // Do not resolve the host name but just use local address for test.
if (!endpoint.hasIpAddr()) {
endpoint = endpoint.withIpAddr("127.0.0.1");
}
return toRemoteAddress(endpoint.host(), ipAddr, endpoint.port());
return endpoint.toSocketAddress(-1);
}

private static InetSocketAddress toRemoteAddress(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
receivedBuffers.add((ByteBuf) msg);
}
});
final Channel ch = b.connect(domainSocketAddress().toNettyAddress())
final Channel ch = b.connect(domainSocketAddress().asNettyAddress())
.syncUninterruptibly()
.channel();

Expand Down