diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java index b13be516f1d1..55d9add47f3c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java @@ -41,7 +41,6 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import io.netty.util.internal.StringUtil; -import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -52,6 +51,8 @@ import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid; @@ -146,10 +147,15 @@ public Handle newHandle() { } } + private static final AtomicLongFieldUpdater TOTAL_PENDING_SIZE_UPDATER = + AtomicLongFieldUpdater.newUpdater(DefaultHttp2StreamChannel.class, "totalPendingSize"); + + private static final AtomicIntegerFieldUpdater UNWRITABLE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(DefaultHttp2StreamChannel.class, "unwritable"); + private final ChannelHandler inboundStreamHandler; private final ChannelHandler upgradeStreamHandler; - private int initialOutboundStreamWindow = Http2CodecUtil.DEFAULT_WINDOW_SIZE; private boolean parentReadInProgress; private int idCount; @@ -230,21 +236,13 @@ final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) { if (frame instanceof Http2StreamFrame) { Http2StreamFrame streamFrame = (Http2StreamFrame) frame; ((Http2MultiplexCodecStream) streamFrame.stream()).channel.fireChildRead(streamFrame); - } else if (frame instanceof Http2GoAwayFrame) { + return; + } + if (frame instanceof Http2GoAwayFrame) { onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame); - // Allow other handlers to act on GOAWAY frame - ctx.fireChannelRead(frame); - } else if (frame instanceof Http2SettingsFrame) { - Http2Settings settings = ((Http2SettingsFrame) frame).settings(); - if (settings.initialWindowSize() != null) { - initialOutboundStreamWindow = settings.initialWindowSize(); - } - // Allow other handlers to act on SETTINGS frame - ctx.fireChannelRead(frame); - } else { - // Send any other frames down the pipeline - ctx.fireChannelRead(frame); } + // Send frames down the pipeline + ctx.fireChannelRead(frame); } private void onHttp2UpgradeStreamInitialized(ChannelHandlerContext ctx, Http2MultiplexCodecStream stream) { @@ -294,11 +292,6 @@ final void onHttp2StreamStateChanged(ChannelHandlerContext ctx, Http2FrameStream } } - @Override - final void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, Http2FrameStream stream, boolean writable) { - (((Http2MultiplexCodecStream) stream).channel).writabilityChanged(writable); - } - // TODO: This is most likely not the best way to expose this, need to think more about it. final Http2StreamChannel newOutboundStream() { return new DefaultHttp2StreamChannel(newStream(), true); @@ -399,6 +392,21 @@ public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exce super.channelRead(ctx, msg); } + @Override + public final void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception { + forEachActiveStream(new Http2FrameStreamVisitor() { + @Override + public boolean visit(Http2FrameStream stream) { + final DefaultHttp2StreamChannel childChannel = ((Http2MultiplexCodecStream) stream).channel; + // As the writability may change during visiting active streams we need to ensure we always fetch + // the current writability state of the channel. + childChannel.updateWritability(ctx.channel().isWritable()); + return true; + } + }); + super.channelWritabilityChanged(ctx); + } + final void onChannelReadComplete(ChannelHandlerContext ctx) { // If we have many child channel we can optimize for the case when multiple call flush() in // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple @@ -421,13 +429,6 @@ static final class Http2MultiplexCodecStream extends DefaultHttp2FrameStream { DefaultHttp2StreamChannel channel; } - private boolean initialWritability(DefaultHttp2FrameStream stream) { - // If the stream id is not valid yet we will just mark the channel as writable as we will be notified - // about non-writability state as soon as the first Http2HeaderFrame is written (if needed). - // This should be good enough and simplify things a lot. - return !isStreamIdValid(stream.id()) || isWritable(stream); - } - /** * The current status of the read-processing for a {@link Http2StreamChannel}. */ @@ -448,7 +449,6 @@ private enum ReadStatus { REQUESTED } - // TODO: Handle writability changes due writing from outside the eventloop. private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel { private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this); private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe(); @@ -459,8 +459,13 @@ private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implem private final boolean outbound; private volatile boolean registered; - // We start with the writability of the channel when creating the StreamChannel. - private volatile boolean writable; + + // Needs to be package-private to be able to access it from the outer-class AtomicLongFieldUpdater. + volatile long totalPendingSize; + volatile int unwritable; + + // Cached to reduce GC + private Runnable fireChannelWritabilityChangedTask; private boolean outboundClosed; @@ -487,23 +492,114 @@ private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implem DefaultHttp2StreamChannel(DefaultHttp2FrameStream stream, boolean outbound) { this.stream = stream; this.outbound = outbound; - writable = initialWritability(stream); ((Http2MultiplexCodecStream) stream).channel = this; pipeline = new DefaultChannelPipeline(this) { @Override protected void incrementPendingOutboundBytes(long size) { - // Do thing for now + DefaultHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true); } @Override protected void decrementPendingOutboundBytes(long size) { - // Do thing for now + DefaultHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true); } }; closePromise = pipeline.newPromise(); channelId = new Http2StreamChannelId(parent().id(), ++idCount); } + private void incrementPendingOutboundBytes(long size, boolean invokeLater) { + if (size == 0) { + return; + } + + long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); + if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) { + setUnwritable(invokeLater); + } + } + + private void decrementPendingOutboundBytes(long size, boolean invokeLater) { + if (size == 0) { + return; + } + + long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); + // Once the totalPendingSize dropped below the low water-mark we can mark the child channel + // as writable again. Before doing so we also need to ensure the parent channel is writable to + // prevent excessive buffering in the parent outbound buffer. If the parent is not writable + // we will mark the child channel as writable once the parent becomes writable by calling + // updateWritability later. + if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) { + setWritable(invokeLater); + } + } + + void updateWritability(boolean parentIsWritable) { + if (parentIsWritable) { + // The parent is writable again but the child channel itself may still not be writable. + // Lets try to set the child channel writable to match the state of the parent channel + // if (and only if) the totalPendingSize is smaller then the low water-mark. + // If this is not the case we will try again later once we drop under it. + trySetWritable(); + } else { + // No matter what the current totalPendingSize for the child channel is as soon as the parent + // channel is unwritable we also need to mark the child channel as unwritable to try to keep + // buffering to a minimum. + setUnwritable(false); + } + } + + private void trySetWritable() { + if (totalPendingSize < config().getWriteBufferLowWaterMark()) { + setWritable(false); + } + } + + private void setWritable(boolean invokeLater) { + for (;;) { + final int oldValue = unwritable; + final int newValue = oldValue & ~1; + if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { + if (oldValue != 0 && newValue == 0) { + fireChannelWritabilityChanged(invokeLater); + } + break; + } + } + } + + private void setUnwritable(boolean invokeLater) { + for (;;) { + final int oldValue = unwritable; + final int newValue = oldValue | 1; + if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { + if (oldValue == 0 && newValue != 0) { + fireChannelWritabilityChanged(invokeLater); + } + break; + } + } + } + + private void fireChannelWritabilityChanged(boolean invokeLater) { + final ChannelPipeline pipeline = pipeline(); + if (invokeLater) { + Runnable task = fireChannelWritabilityChangedTask; + if (task == null) { + fireChannelWritabilityChangedTask = task = new Runnable() { + @Override + public void run() { + pipeline.fireChannelWritabilityChanged(); + } + }; + } + eventLoop().execute(task); + } else { + pipeline.fireChannelWritabilityChanged(); + } + } + @Override public Http2FrameStream stream() { return stream; @@ -538,7 +634,7 @@ public boolean isActive() { @Override public boolean isWritable() { - return writable; + return unwritable == 0 && parent().isWritable(); } @Override @@ -578,13 +674,25 @@ public ChannelFuture closeFuture() { @Override public long bytesBeforeUnwritable() { - // TODO: Do a proper impl - return config().getWriteBufferHighWaterMark(); + long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize; + // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check + // writability. Note that totalPendingSize and isWritable() use different volatile variables that are not + // synchronized together. totalPendingSize will be updated before isWritable(). + if (bytes > 0) { + return isWritable() ? bytes : 0; + } + return 0; } @Override public long bytesBeforeWritable() { - // TODO: Do a proper impl + long bytes = totalPendingSize - config().getWriteBufferLowWaterMark(); + // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability. + // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized + // together. totalPendingSize will be updated before isWritable(). + if (bytes > 0) { + return isWritable() ? 0 : bytes; + } return 0; } @@ -744,15 +852,6 @@ public String toString() { return parent().toString() + "(H2 - " + stream + ')'; } - void writabilityChanged(boolean writable) { - assert eventLoop().inEventLoop(); - if (writable != this.writable && isActive()) { - // Only notify if we received a state change. - this.writable = writable; - pipeline().fireChannelWritabilityChanged(); - } - } - /** * Receive a read message. This does not notify handlers unless a read is in progress on the * channel. @@ -1105,14 +1204,17 @@ public void write(Object msg, final ChannelPromise promise) { return; } firstFrameWritten = true; - ChannelFuture future = write0(frame); - if (future.isDone()) { - firstWriteComplete(future, promise); + ChannelFuture f = write0(frame); + if (f.isDone()) { + firstWriteComplete(f, promise); } else { - future.addListener(new ChannelFutureListener() { + final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(msg); + incrementPendingOutboundBytes(bytes, false); + f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { firstWriteComplete(future, promise); + decrementPendingOutboundBytes(bytes, false); } }); } @@ -1127,14 +1229,17 @@ public void operationComplete(ChannelFuture future) { return; } - ChannelFuture future = write0(msg); - if (future.isDone()) { - writeComplete(future, promise); + ChannelFuture f = write0(msg); + if (f.isDone()) { + writeComplete(f, promise); } else { - future.addListener(new ChannelFutureListener() { + final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(msg); + incrementPendingOutboundBytes(bytes, false); + f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { writeComplete(future, promise); + decrementPendingOutboundBytes(bytes, false); } }); } @@ -1148,9 +1253,6 @@ public void operationComplete(ChannelFuture future) { private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) { Throwable cause = future.cause(); if (cause == null) { - // As we just finished our first write which made the stream-id valid we need to re-evaluate - // the writability of the channel. - writabilityChanged(Http2MultiplexCodec.this.isWritable(stream)); promise.setSuccess(); } else { // If the first write fails there is not much we can do, just close @@ -1243,49 +1345,16 @@ private final class Http2StreamChannelConfig extends DefaultChannelConfig { super(channel); } - @Override - public int getWriteBufferHighWaterMark() { - return min(parent().config().getWriteBufferHighWaterMark(), initialOutboundStreamWindow); - } - - @Override - public int getWriteBufferLowWaterMark() { - return min(parent().config().getWriteBufferLowWaterMark(), initialOutboundStreamWindow); - } - @Override public MessageSizeEstimator getMessageSizeEstimator() { return FlowControlledFrameSizeEstimator.INSTANCE; } - @Override - public WriteBufferWaterMark getWriteBufferWaterMark() { - int mark = getWriteBufferHighWaterMark(); - return new WriteBufferWaterMark(mark, mark); - } - @Override public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { throw new UnsupportedOperationException(); } - @Override - @Deprecated - public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { - throw new UnsupportedOperationException(); - } - - @Override - @Deprecated - public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { - throw new UnsupportedOperationException(); - } - - @Override - public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { - throw new UnsupportedOperationException(); - } - @Override public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java index b01fe69c0a57..7cfc9502ff97 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java @@ -22,7 +22,9 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpScheme; @@ -53,6 +55,7 @@ import static io.netty.util.ReferenceCountUtil.release; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -674,38 +677,45 @@ public void outboundFlowControlWritability() { parentChannel.flush(); // Test for initial window size - assertEquals(initialRemoteStreamWindow, childChannel.config().getWriteBufferHighWaterMark()); + assertTrue(initialRemoteStreamWindow < childChannel.config().getWriteBufferHighWaterMark()); assertTrue(childChannel.isWritable()); childChannel.write(new DefaultHttp2DataFrame(Unpooled.buffer().writeZero(16 * 1024 * 1024))); + assertEquals(0, childChannel.bytesBeforeUnwritable()); assertFalse(childChannel.isWritable()); } @Test - public void writabilityAndFlowControl() { - LastInboundHandler inboundHandler = new LastInboundHandler(); - Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler); - assertEquals("", inboundHandler.writabilityStates()); - + public void writabilityOfParentIsRespected() { + Http2StreamChannel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter()); + childChannel.config().setWriteBufferWaterMark(new WriteBufferWaterMark(2048, 4096)); + parentChannel.config().setWriteBufferWaterMark(new WriteBufferWaterMark(256, 512)); assertTrue(childChannel.isWritable()); - // HEADERS frames are not flow controlled, so they should not affect the flow control window. + assertTrue(parentChannel.isActive()); + childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers())); - codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), true); + parentChannel.flush(); assertTrue(childChannel.isWritable()); - assertEquals("", inboundHandler.writabilityStates()); - - codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), true); + childChannel.write(new DefaultHttp2DataFrame(Unpooled.buffer().writeZero(256))); assertTrue(childChannel.isWritable()); - assertEquals("", inboundHandler.writabilityStates()); - - codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), false); + childChannel.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.buffer().writeZero(512))); + + long bytesBeforeUnwritable = childChannel.bytesBeforeUnwritable(); + assertNotEquals(0, bytesBeforeUnwritable); + // Add something to the ChannelOutboundBuffer of the parent to simulate queuing in the parents channel buffer + // and verify that this also effects the child channel in terms of writability. + parentChannel.unsafe().outboundBuffer().addMessage( + Unpooled.buffer().writeZero(800), 800, parentChannel.voidPromise()); + assertFalse(parentChannel.isWritable()); assertFalse(childChannel.isWritable()); - assertEquals("false", inboundHandler.writabilityStates()); + assertEquals(0, childChannel.bytesBeforeUnwritable()); - codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), false); - assertFalse(childChannel.isWritable()); - assertEquals("false", inboundHandler.writabilityStates()); + // Flush everything which simulate writing everything to the socket. + parentChannel.flush(); + assertTrue(parentChannel.isWritable()); + assertTrue(childChannel.isWritable()); + assertEquals(bytesBeforeUnwritable, childChannel.bytesBeforeUnwritable()); } @Test