Skip to content

Commit

Permalink
Add server-side Unix domain socket support
Browse files Browse the repository at this point in the history
Motivation:

In service mesh environment, it's quite common for a sidecar to
communicate with an app using a Unix domain socket. Armeria could be
used in such a scenarios if it is capable of serving requests on a Unix
domain socket.

Modifications:

- Added `DomainSocketAddress`
- Added `ServerPort.isDomainSocket()`
- Added various getters to `TransportType` and `TransportTypeProvider`
  so it provides the transport-specific information about domain socket
  classes and whether the transport supports domain sockets
- Added `ChannelUtil.localAddress()` and `remoteAddress()` and replace
  the direct invocation of `Channel.localAddress()` and
  `remoteAddress()` to work around the issue where Netty's domain socket
  channel returns `null`
  - See: netty/netty#13323
- Added `ChannelUtil.isTcpOption()` to determine whether the given
  `ChannelOption` is for TCP or not, so that a user doesn't get a WARN
  log when they use domain sockets
- Made `ServerRule` and `ServerExtension`not instantiate their default
  `WebClient`s lazily, so that a user can start a serer that listens
  only on a domain socket

Result:

- Armeria is now capable of serving requests from a Unix domain socket,
  making it more service-mesh-friendly.

Fix leaks

Update Netty to 4.1.92

Cache remote/localAddress / SocketAddress -> InetSocketAddress

Add client-side domain socket support

Lint

Windows

Formatting

Windows / Lint

Review comments
  • Loading branch information
trustin committed May 19, 2023
1 parent f9ea26d commit e65ad13
Show file tree
Hide file tree
Showing 41 changed files with 816 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import static com.google.common.base.MoreObjects.firstNonNull;

import java.net.SocketAddress;
import java.net.InetSocketAddress;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.common.HttpRequest;
Expand Down Expand Up @@ -94,12 +94,12 @@ public void parse(HttpResponse response, TraceContext context, SpanCustomizer sp
span.tag(SpanTags.TAG_HTTP_SERIALIZATION_FORMAT, serFmt);
}

final SocketAddress raddr = ctx.remoteAddress();
final InetSocketAddress raddr = ctx.remoteAddress();
if (raddr != null) {
span.tag(SpanTags.TAG_ADDRESS_REMOTE, raddr.toString());
}

final SocketAddress laddr = ctx.localAddress();
final InetSocketAddress laddr = ctx.localAddress();
if (laddr != null) {
span.tag(SpanTags.TAG_ADDRESS_LOCAL, laddr.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

package com.linecorp.armeria.internal.common.brave;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;

import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.logging.RequestLog;
Expand Down Expand Up @@ -54,19 +52,9 @@ public static void logWireReceive(Span span, long wireReceiveTimeNanos, RequestL
}

public static boolean updateRemoteEndpoint(Span span, RequestContext ctx) {
final SocketAddress remoteAddress = ctx.remoteAddress();
final InetAddress address;
final int port;
if (remoteAddress instanceof InetSocketAddress) {
final InetSocketAddress socketAddress = (InetSocketAddress) remoteAddress;
address = socketAddress.getAddress();
port = socketAddress.getPort();
} else {
address = null;
port = 0;
}
if (address != null) {
return span.remoteIpAndPort(address.getHostAddress(), port);
final InetSocketAddress remoteAddress = ctx.remoteAddress();
if (remoteAddress != null) {
return span.remoteIpAndPort(remoteAddress.getAddress().getHostAddress(), remoteAddress.getPort());
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import static java.util.Objects.requireNonNull;

import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -200,12 +200,12 @@ public ClientRequestContextBuilder id(RequestId id) {
}

@Override
public ClientRequestContextBuilder remoteAddress(SocketAddress remoteAddress) {
public ClientRequestContextBuilder remoteAddress(InetSocketAddress remoteAddress) {
return (ClientRequestContextBuilder) super.remoteAddress(remoteAddress);
}

@Override
public ClientRequestContextBuilder localAddress(SocketAddress localAddress) {
public ClientRequestContextBuilder localAddress(InetSocketAddress localAddress) {
return (ClientRequestContextBuilder) super.localAddress(localAddress);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public HttpResponse execute(HttpRequest req, RequestOptions requestOptions) {
final RequestTarget reqTarget = RequestTarget.forClient(originalPath, prefix);
if (reqTarget == null) {
return abortRequestAndReturnFailureResponse(
req, new IllegalArgumentException("Invalid path: " + originalPath));
req, new IllegalArgumentException("Invalid request target: " + originalPath));
}

final EndpointGroup endpointGroup;
Expand Down
62 changes: 55 additions & 7 deletions core/src/main/java/com/linecorp/armeria/client/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardProtocolFamily;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -49,6 +51,7 @@
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.common.util.DomainSocketAddress;
import com.linecorp.armeria.common.util.UnmodifiableFuture;
import com.linecorp.armeria.internal.common.util.TemporaryThreadLocals;

Expand Down Expand Up @@ -105,6 +108,14 @@ public static Endpoint parse(String authority) {
});
}

private static String removeUserInfo(String authority) {
final int indexOfDelimiter = authority.lastIndexOf('@');
if (indexOfDelimiter == -1) {
return authority;
}
return authority.substring(indexOfDelimiter + 1);
}

/**
* Creates a new host {@link Endpoint}.
*
Expand All @@ -125,6 +136,37 @@ public static Endpoint of(String host) {
return create(host, 0, true);
}

/**
* Creates a new {@link Endpoint} from the specified {@link SocketAddress}.
* This method converts the following address types into an endpoint:
* <ul>
* <li>{@link InetSocketAddress}</li>
* <li>{@link DomainSocketAddress}</li>
* <li>{@link io.netty.channel.unix.DomainSocketAddress}</li>
* </ul>
*
* @throws IllegalArgumentException if the specified {@link SocketAddress} is not supported
*/
public static Endpoint of(SocketAddress addr) {
requireNonNull(addr, "addr");
if (addr instanceof io.netty.channel.unix.DomainSocketAddress) {
addr = DomainSocketAddress.of((io.netty.channel.unix.DomainSocketAddress) addr);
}

if (addr instanceof DomainSocketAddress) {
return of(((DomainSocketAddress) addr).authority());
}

checkArgument(addr instanceof InetSocketAddress,
"unsupported address: %s", addr);

final InetSocketAddress inetAddr = (InetSocketAddress) addr;
@SuppressWarnings("resource")
final Endpoint endpoint = of(inetAddr.getHostString(), inetAddr.getPort());
return inetAddr.isUnresolved() ? endpoint
: endpoint.withIpAddr(inetAddr.getAddress().getHostAddress());
}

/**
* Creates a new host {@link Endpoint} <strong>without</strong> validation.
*
Expand Down Expand Up @@ -152,18 +194,17 @@ private static Endpoint create(String host, int port, boolean validateHost) {
return new Endpoint(ipV6Addr, ipV6Addr, port, DEFAULT_WEIGHT, HostType.IPv6_ONLY, null);
}

if (validateHost) {
if (validateHost && !isDomainSocketAuthority(host)) {
host = InternetDomainName.from(host).toString();
}
return new Endpoint(host, null, port, DEFAULT_WEIGHT, HostType.HOSTNAME_ONLY, null);
}

private static String removeUserInfo(String authority) {
final int indexOfDelimiter = authority.lastIndexOf('@');
if (indexOfDelimiter == -1) {
return authority;
}
return authority.substring(indexOfDelimiter + 1);
static boolean isDomainSocketAuthority(String host) {
// Return true if `host` starts with `unix%3A` or `unix%3a`.
return host.length() > 7 &&
host.startsWith("unix%3") &&
Character.toUpperCase(host.charAt(6)) == 'A';
}

private enum HostType {
Expand Down Expand Up @@ -363,6 +404,13 @@ public StandardProtocolFamily ipFamily() {
}
}

/**
* Returns whether this endpoint connects to a domain socket.
*/
public boolean isDomainSocket() {
return isDomainSocketAuthority(host);
}

/**
* Returns the port number of this endpoint.
*
Expand Down
Loading

0 comments on commit e65ad13

Please sign in to comment.