-
Notifications
You must be signed in to change notification settings - Fork 927
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve the cleanup of streaming responses #1119
Conversation
// One of the two cases: | ||
// - Client closed the connection too early. | ||
// - Response publisher aborted the stream. | ||
failAndReset((AbortedStreamException) cause); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc @kojilin You may want to send a canary to make sure this PR fixes your problem.
@@ -197,16 +197,41 @@ public int unfinishedRequests() { | |||
} | |||
|
|||
@Override | |||
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { | |||
super.channelUnregistered(ctx); | |||
unfinishedResponses.keySet().forEach(StreamMessage::abort); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No idea why channelUnregistered
is used instead of channelInactive
but they don't make any difference, so I moved the cleanup logic to channelInactive
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The netty documentation on these callbacks wasn't very clear, so I implemented every method and added breakpoints to try to figure it out. This was called where I expected and unregistering from armeria when unregistering from netty made intuitive sense, but if they're similar semantics seems good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unregistration will always follow channelInactive
. Netty allows deregistration without closing for a very special case, but we don't use it.
@@ -373,17 +398,15 @@ private void handleRequest(ChannelHandlerContext ctx, DecodedHttpRequest req) th | |||
} | |||
})).exceptionally(CompletionActions::log); | |||
|
|||
res.completionFuture().handle(voidFunction((ret, cause) -> { | |||
res.completionFuture().handleAsync(voidFunction((ret, cause) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really related with this PR, but thought this is cleaner.
if (data == null || !res.tryWrite(data)) { | ||
break; | ||
if (res.tryWrite(headers)) { | ||
for (;;) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc @kojilin This should fix the IllegalStateException
issue you reported.
if (d == null || !res.tryWrite(d)) { | ||
break; | ||
if (res.tryWrite(headers)) { | ||
for (;;) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc @kojilin This should fix the IllegalStateException
issue you reported.
@@ -97,6 +98,7 @@ public void jarBasedWebApp() throws Exception { | |||
.startsWith("application/java"); | |||
assertThat(res.getFirstHeader(HttpHeaderNames.CONTENT_LENGTH.toString()).getValue()) | |||
.isEqualTo("1361"); | |||
EntityUtils.consume(res.getEntity()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this, Apache HC will close the connection before reading the response fully.
@@ -110,6 +112,7 @@ public void jarBasedWebAppWithAlternativeRoot() throws Exception { | |||
.startsWith("application/java"); | |||
assertThat(res.getFirstHeader(HttpHeaderNames.CONTENT_LENGTH.toString()).getValue()) | |||
.isEqualTo("1361"); | |||
EntityUtils.consume(res.getEntity()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this, Apache HC will close the connection before reading the response fully.
case H1C: | ||
case H1: | ||
// XXX(trustin): How much time is 'a little bit'? | ||
ctx.channel().eventLoop().schedule(this::cleanup, 1, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we should make this delay configurable. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Configuring just this knob is probably overkill - setting it based on idle timeout handler might make sense, but might also be overkill.
Codecov Report
@@ Coverage Diff @@
## master #1119 +/- ##
=========================================
- Coverage 72.48% 72.4% -0.09%
=========================================
Files 510 510
Lines 22784 22801 +17
Branches 2826 2830 +4
=========================================
- Hits 16514 16508 -6
- Misses 4764 4782 +18
- Partials 1506 1511 +5
Continue to review full report at Codecov.
|
@@ -197,16 +197,41 @@ public int unfinishedRequests() { | |||
} | |||
|
|||
@Override | |||
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { | |||
super.channelUnregistered(ctx); | |||
unfinishedResponses.keySet().forEach(StreamMessage::abort); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The netty documentation on these callbacks wasn't very clear, so I implemented every method and added breakpoints to try to figure it out. This was called where I expected and unregistering from armeria when unregistering from netty made intuitive sense, but if they're similar semantics seems good.
case H1C: | ||
case H1: | ||
// XXX(trustin): How much time is 'a little bit'? | ||
ctx.channel().eventLoop().schedule(this::cleanup, 1, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Configuring just this knob is probably overkill - setting it based on idle timeout handler might make sense, but might also be overkill.
if (responseEncoder != null) { | ||
responseEncoder.close(); | ||
} | ||
|
||
unfinishedResponses.keySet().forEach(res -> { | ||
if (!res.isComplete()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be mistaken but I thought our semantics allowed aborting already completed streams (since completion is asynchronous I thought it's considered best effort vs other completion events)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's probably redundant. Let me run the tests again without this guard tomorrow and let you know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything's fine without the guard. Removed.
a5043bd
to
fa7e791
Compare
cancelTimeout(); | ||
state = State.DONE; | ||
final State state = this.state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oldState
to make it clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
Motivation: A successfully sent response can be logged with AbortedStreamException or ClosedChannelException due to race conditions. Modifications: - Make HttpResponseSubscriber handle AbortedStreamException and ClosedChannelException better so that they are not logged unnecessarily - Make HttpServerHandler clean up unfinished HTTP/1 responses a little bit later so that they are not aborted too early. - Miscellaneous: - Fix a bug in DeferredStreamMessage where NeverInvokedSubscriber is invoked, which happens when a DeferredStreamMessage is delegated to another DeferredStreamMessage. Result: - Much less chance of redundant AbortedStreamException or ClosedChannelException
fa7e791
to
02d282a
Compare
Thanks for reviewing. Will merge once CI passes. |
Thank you! |
Motivation: A successfully sent response can be logged with AbortedStreamException or ClosedChannelException due to race conditions. Modifications: - Make HttpResponseSubscriber handle AbortedStreamException and ClosedChannelException better so that they are not logged unnecessarily - Make HttpServerHandler clean up unfinished HTTP/1 responses a little bit later so that they are not aborted too early. - Miscellaneous: - Fix a bug in DeferredStreamMessage where NeverInvokedSubscriber is invoked, which happens when a DeferredStreamMessage is delegated to another DeferredStreamMessage. Result: - Much less chance of redundant AbortedStreamException or ClosedChannelException
Motivation: A successfully sent response can be logged with AbortedStreamException or ClosedChannelException due to race conditions. Modifications: - Make HttpResponseSubscriber handle AbortedStreamException and ClosedChannelException better so that they are not logged unnecessarily - Make HttpServerHandler clean up unfinished HTTP/1 responses a little bit later so that they are not aborted too early. - Miscellaneous: - Fix a bug in DeferredStreamMessage where NeverInvokedSubscriber is invoked, which happens when a DeferredStreamMessage is delegated to another DeferredStreamMessage. Result: - Much less chance of redundant AbortedStreamException or ClosedChannelException
Motivation:
A successfully sent response can be logged with AbortedStreamException
or ClosedChannelException due to race conditions.
Modifications:
ClosedChannelException better so that they are not logged
unnecessarily
bit later so that they are not aborted too early.
invoked, which happens when a DeferredStreamMessage is delegated to
another DeferredStreamMessage.
Result:
ClosedChannelException