Skip to content

Commit

Permalink
updates to test native epoll
Browse files Browse the repository at this point in the history
  • Loading branch information
nmittler committed May 14, 2015
1 parent c3125be commit 8f537e3
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 38 deletions.
4 changes: 3 additions & 1 deletion benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ dependencies {
project(':grpc-integration-testing'),
libraries.junit,
libraries.mockito,
libraries.hdrhistogram
libraries.hdrhistogram,
libraries.netty_tcnative,
libraries.netty_transport_native_epoll

alpnboot alpnboot_package_name
}
Expand Down
26 changes: 24 additions & 2 deletions benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.DIRECTEXECUTOR;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.DURATION;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.HOST;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.NETTY_NATIVE_TRANSPORT;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.OKHTTP;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.OUTSTANDING_RPCS;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.PORT;
Expand All @@ -61,6 +62,7 @@

import grpc.testing.Qpstest.Payload;
import grpc.testing.TestServiceGrpc;

import io.grpc.Channel;
import io.grpc.ChannelImpl;
import io.grpc.Status;
Expand All @@ -69,7 +71,14 @@
import io.grpc.transport.netty.NegotiationType;
import io.grpc.transport.netty.NettyChannelBuilder;
import io.grpc.transport.okhttp.OkHttpChannelBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslProvider;

import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramIterationValue;

Expand Down Expand Up @@ -185,10 +194,23 @@ private Channel newChannel() throws IOException {
// Force the hostname to match the cert the server uses.
address = InetAddress.getByAddress("foo.test.google.fr", address.getAddress());
File cert = loadCert("ca.pem");
context = GrpcSslContexts.forClient().trustManager(cert).build();
context = GrpcSslContexts.forClient().trustManager(cert)
.sslProvider(config.nettyNativeTransport ? SslProvider.OPENSSL : SslProvider.JDK)
.build();
}
final EventLoopGroup group;
final Class<? extends io.netty.channel.Channel> channelType;
if (config.nettyNativeTransport) {
group = new EpollEventLoopGroup();
channelType = EpollSocketChannel.class;
} else {
group = new NioEventLoopGroup();
channelType = NioSocketChannel.class;
}
return NettyChannelBuilder
.forAddress(new InetSocketAddress(address, config.port))
.eventLoopGroup(group)
.channelType(channelType)
.negotiationType(negotiationType)
.executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null)
.sslContext(context)
Expand Down Expand Up @@ -369,7 +391,7 @@ public static void main(String... args) throws Exception {
.addOptions(CLIENT_PAYLOAD, SERVER_PAYLOAD, TLS, TESTCA)
.addOptions(OKHTTP, DURATION, WARMUP_DURATION, DIRECTEXECUTOR)
.addOptions(SAVE_HISTOGRAM, STREAMING_RPCS, CONNECTION_WINDOW)
.addOptions(STREAM_WINDOW);
.addOptions(STREAM_WINDOW, NETTY_NATIVE_TRANSPORT);
ClientConfiguration config;
try {
config = configBuilder.build(args);
Expand Down
37 changes: 32 additions & 5 deletions benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,20 @@
import com.google.protobuf.ByteString;

import grpc.testing.TestServiceGrpc;

import io.grpc.ServerImpl;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.grpc.transport.netty.GrpcSslContexts;
import io.grpc.transport.netty.NettyServerBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslProvider;

import java.io.File;
import java.util.concurrent.TimeUnit;
Expand All @@ -62,6 +70,7 @@ public class AsyncServer {
private int connectionWindow = NettyServerBuilder.DEFAULT_CONNECTION_WINDOW_SIZE;
private int streamWindow = NettyServerBuilder.DEFAULT_STREAM_WINDOW_SIZE;
private boolean directExecutor;
private boolean nettyNativeTransport;

/**
* checkstyle complains if there is no javadoc comment here.
Expand All @@ -79,19 +88,35 @@ public void run(String[] args) throws Exception {
SslContext sslContext = null;
if (tls) {
System.out.println("Using fake CA for TLS certificate.\n"
+ "Run the Java client with --tls --testca");
+ "Run the Java client with --tls --testca");

File cert = loadCert("server1.pem");
File key = loadCert("server1.key");
sslContext = GrpcSslContexts.forServer(cert, key).build();
sslContext = GrpcSslContexts.forServer(cert, key)
.sslProvider(nettyNativeTransport ? SslProvider.OPENSSL : SslProvider.JDK).build();
}

if (port == 0) {
port = pickUnusedPort();
}

final EventLoopGroup boss;
final EventLoopGroup worker;
final Class<? extends ServerChannel> channelType;
if (nettyNativeTransport) {
boss = new EpollEventLoopGroup();
worker = new EpollEventLoopGroup();
channelType = EpollServerSocketChannel.class;
} else {
boss = new NioEventLoopGroup();
worker = new NioEventLoopGroup();
channelType = NioServerSocketChannel.class;
}
final ServerImpl server = NettyServerBuilder
.forPort(port)
.bossEventLoopGroup(boss)
.workerEventLoopGroup(worker)
.channelType(channelType)
.addService(TestServiceGrpc.bindService(new TestServiceImpl()))
.sslContext(sslContext)
.executor(directExecutor ? MoreExecutors.newDirectExecutorService() : null)
Expand Down Expand Up @@ -145,6 +170,8 @@ private boolean parseArgs(String[] args) {
connectionWindow = Integer.parseInt(value);
} else if ("stream_window".equals(key)) {
streamWindow = Integer.parseInt(value);
} else if ("netty_native_transport".equals(key)) {
nettyNativeTransport = true;
} else {
System.err.println("Unrecognized argument '" + key + "'.");
}
Expand All @@ -159,7 +186,6 @@ private boolean parseArgs(String[] args) {
}

private void printUsage() {
AsyncServer s = new AsyncServer();
System.out.println(
"Usage: [ARGS...]"
+ "\n"
Expand All @@ -168,6 +194,8 @@ private void printUsage() {
+ "\n --directexecutor Use a direct executor i.e. execute all RPC"
+ "\n calls directly in Netty's event loop"
+ "\n overhead of a thread pool."
+ "\n --netty_native_transport Whether to use Netty's native transport."
+ "\n Only supported on linux."
+ "\n --connection_window=BYTES The HTTP/2 connection flow control window."
+ "\n Default " + connectionWindow + " byte."
+ "\n --stream_window=BYTES The HTTP/2 per-stream flow control window."
Expand Down Expand Up @@ -223,8 +251,7 @@ private static SimpleResponse buildSimpleResponse(SimpleRequest request) {
PayloadType type = request.getResponseType();

Payload payload = Payload.newBuilder().setType(type).setBody(body).build();
SimpleResponse response = SimpleResponse.newBuilder().setPayload(payload).build();
return response;
return SimpleResponse.newBuilder().setPayload(payload).build();
}
return SimpleResponse.getDefaultInstance();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class ClientConfiguration {
boolean tls;
boolean testca;
boolean directExecutor;
boolean nettyNativeTransport;
int port;
int channels = 4;
int outstandingRpcsPerChannel = 10;
Expand Down Expand Up @@ -206,18 +207,27 @@ public void applyNew(ClientConfiguration config, String value) {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.tls = true;
if (!value.isEmpty()) {
config.tls = Boolean.parseBoolean(value);
}
}
}),
TESTCA("", "Use the provided Test Certificate for TLS.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.testca = true;
if (!value.isEmpty()) {
config.testca = Boolean.parseBoolean(value);
}
}
}),
OKHTTP("", "Use OkHttp as the Transport.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.okhttp = true;
if (!value.isEmpty()) {
config.okhttp = Boolean.parseBoolean(value);
}
}
}),
DURATION("SECONDS", "Duration of the benchmark.", new Action() {
Expand All @@ -239,6 +249,20 @@ public void applyNew(ClientConfiguration config, String value) {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.directExecutor = true;
if (!value.isEmpty()) {
config.directExecutor = Boolean.parseBoolean(value);
}
}
}),
NETTY_NATIVE_TRANSPORT("", "Whether to use Netty's native transport. Only supported when "
+ "using the Netty transport on Linux.",
new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.nettyNativeTransport = true;
if (!value.isEmpty()) {
config.nettyNativeTransport = Boolean.parseBoolean(value);
}
}
}),
SAVE_HISTOGRAM("FILE", "Write the histogram with the latency recordings to file.",
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ subprojects {
protobuf_plugin: 'com.google.protobuf:protobuf-gradle-plugin:0.4.1',

netty: 'io.netty:netty-codec-http2:4.1.0.Beta5',
netty_tcnative: "io.netty:netty-tcnative:1.1.33.Fork2:${osdetector.classifier}",
netty_transport_native_epoll: "io.netty:netty-transport-native-epoll:4.1.0.Beta5:${osdetector.classifier}",

// Test dependencies.
junit: 'junit:junit:4.11',
Expand Down
78 changes: 57 additions & 21 deletions netty/src/main/java/io/grpc/transport/netty/Http2Negotiator.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpRequest;
Expand All @@ -48,6 +49,7 @@
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2OrHttpChooser;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.util.concurrent.Future;
Expand All @@ -56,6 +58,7 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -65,6 +68,7 @@
import java.util.logging.Logger;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;

/**
* A utility class that provides support methods for negotiating the use of HTTP/2 with the remote
Expand Down Expand Up @@ -93,8 +97,11 @@ private Http2Negotiator() {
*/
public static ChannelHandler serverTls(SSLEngine sslEngine) {
Preconditions.checkNotNull(sslEngine, "sslEngine");
if (!installJettyTlsProtocolSelection(sslEngine, SettableFuture.<Void>create(), true)) {
throw new IllegalStateException("NPN/ALPN extensions not installed");
if (!isOpenSsl(sslEngine.getClass())) {
// Using JDK SSL
if (!installJettyTlsProtocolSelection(sslEngine, SettableFuture.<Void>create(), true)) {
throw new IllegalStateException("NPN/ALPN extensions not installed");
}
}
return new SslHandler(sslEngine, false);
}
Expand All @@ -104,26 +111,48 @@ public static ChannelHandler serverTls(SSLEngine sslEngine) {
* be negotiated, the {@code handler} is added and writes to the {@link Channel} may happen
* immediately, even before the TLS Handshake is complete.
*/
public static ChannelHandler tls(final SSLEngine sslEngine, final ChannelHandler handler) {
Preconditions.checkNotNull(sslEngine, "sslEngine");
final SettableFuture<Void> completeFuture = SettableFuture.create();
if (!installJettyTlsProtocolSelection(sslEngine, completeFuture, false)) {
throw new IllegalStateException("NPN/ALPN extensions not installed");
}
SslHandler sslHandler = new SslHandler(sslEngine, false);
sslHandler.handshakeFuture().addListener(
new GenericFutureListener<Future<? super Channel>>() {
@Override
public void operationComplete(Future<? super Channel> future) throws Exception {
// If an error occurred during the handshake, throw it to the pipeline.
if (future.isSuccess()) {
completeFuture.get();
} else {
future.get();
}
public static ChannelHandler tls(final SslContext sslContext, final InetSocketAddress inetAddress,
final ChannelHandler handler) {
Preconditions.checkNotNull(sslContext, "sslContext");
Preconditions.checkNotNull(inetAddress, "inetAddress");

ChannelHandler sslBootstrapHandler = new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// TODO(nmittler): This method is currently unsupported for OpenSSL. Need to fix in Netty.
SSLEngine sslEngine = sslEngine = sslContext.newEngine(ctx.alloc(),
inetAddress.getHostName(), inetAddress.getPort());
SSLParameters sslParams = new SSLParameters();
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
sslEngine.setSSLParameters(sslParams);

final SettableFuture<Void> completeFuture = SettableFuture.create();
if (isOpenSsl(sslContext.getClass())) {
completeFuture.set(null);
} else {
// Using JDK SSL
if (!installJettyTlsProtocolSelection(sslEngine, completeFuture, false)) {
throw new IllegalStateException("NPN/ALPN extensions not installed");
}
});
return new BufferUntilTlsNegotiatedHandler(sslHandler, handler);
}

SslHandler sslHandler = new SslHandler(sslEngine, false);
sslHandler.handshakeFuture().addListener(
new GenericFutureListener<Future<? super Channel>>() {
@Override
public void operationComplete(Future<? super Channel> future) throws Exception {
// If an error occurred during the handshake, throw it to the pipeline.
if (future.isSuccess()) {
completeFuture.get();
} else {
future.get();
}
}
});
ctx.pipeline().replace(this, "sslHandler", sslHandler);
}
};
return new BufferUntilTlsNegotiatedHandler(sslBootstrapHandler, handler);
}

/**
Expand All @@ -148,6 +177,13 @@ public static ChannelHandler plaintext(final ChannelHandler handler) {
return new BufferUntilChannelActiveHandler(handler);
}

/**
* Returns {@code true} if the given class is for use with Netty OpenSsl.
*/
private static boolean isOpenSsl(Class<?> clazz) {
return clazz.getSimpleName().toLowerCase().contains("openssl");
}

/**
* Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
* {@link #failBufferedAndClose(ChannelHandlerContext)} is called. This handler allows us to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@
import java.util.logging.Logger;

import javax.annotation.concurrent.GuardedBy;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLParameters;

/**
* A Netty-based {@link ClientTransport} implementation.
Expand Down Expand Up @@ -138,13 +136,7 @@ class NettyClientTransport implements ClientTransport {
throw new RuntimeException(ex);
}
}
// TODO(ejona86): specify allocator. The method currently ignores it though.
SSLEngine sslEngine
= sslContext.newEngine(null, inetAddress.getHostString(), inetAddress.getPort());
SSLParameters sslParams = new SSLParameters();
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
sslEngine.setSSLParameters(sslParams);
negotiationHandler = Http2Negotiator.tls(sslEngine, handler);
negotiationHandler = Http2Negotiator.tls(sslContext, inetAddress, handler);
ssl = true;
break;
default:
Expand Down

0 comments on commit 8f537e3

Please sign in to comment.