-
Notifications
You must be signed in to change notification settings - Fork 927
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add WebSocketClient #4972
Add WebSocketClient #4972
Conversation
core/src/main/java/com/linecorp/armeria/client/DefaultWebClient.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/internal/common/DefaultSplitHttpResponse.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/websocket/WebSocketClient.java
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/websocket/WebSocketClientBuilder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/websocket/WebSocketClientBuilder.java
Outdated
Show resolved
Hide resolved
RequestHeaders headers = request.headers(); | ||
final HttpSession session = HttpSession.get(channel()); | ||
final SerializationFormat serializationFormat = session.serializationFormat(); | ||
if (serializationFormat == SerializationFormat.WS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HttpSession
is determined before starting subscription.We may add WebSocketHttp2RequestSubscriber
and move this specialized logic there?
import com.linecorp.armeria.common.RequestHeaders;
class WebSocketHttp2RequestSubscriber {
RequestHeaders mapHeaders(RequestHeaders headers) {
headers
.toBuilder()
.method(HttpMethod.CONNECT)
...
}
}
core/src/main/java/com/linecorp/armeria/client/ClientOptions.java
Outdated
Show resolved
Hide resolved
@@ -347,6 +359,10 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc | |||
this.protocol = protocol; | |||
if (protocol == H1 || protocol == H1C) { | |||
final Http1ResponseDecoder responseDecoder = ctx.pipeline().get(Http1ResponseDecoder.class); | |||
final boolean webSocket = serializationFormat == SerializationFormat.WS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Would it be possible to replace Http1ResponseDecoder
with WebSocketResponseDecoder
to modulize the WebSocket domain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split it. PTAL. 😉
a94f576
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we could completely replace Http1ResponseDecoder
. The current implementation looks good.
This PR is already huge. If necessary, additional modularization could be taken in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use WebSocketHttp1ClientChannelHandler
instead. PTAL. 😉
core/src/main/java/com/linecorp/armeria/client/HttpChannelPool.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/AbstractWebClientBuilder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/Http1ResponseDecoder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/Http1ResponseDecoder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1RequestSubscriber.java
Outdated
Show resolved
Hide resolved
import com.linecorp.armeria.server.websocket.WebSocketServiceHandler; | ||
import com.linecorp.armeria.testing.junit5.server.ServerExtension; | ||
|
||
class WebSocketClientTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also test against Jetty's WebSocket server? We could add it to :it:websocket
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have caught my laziness. 🤣
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤣
core/src/test/java/com/linecorp/armeria/client/websocket/WebSocketClientHandshakeTest.java
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/internal/common/websocket/WebSocketUtil.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/internal/common/HttpHeadersUtil.java
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/internal/common/DefaultSplitHttpResponse.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/websocket/DefaultWebSocketClient.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/internal/common/DefaultSplitHttpResponse.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/websocket/DefaultWebSocketClient.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/websocket/DefaultWebSocketClient.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/websocket/WebSocketSession.java
Show resolved
Hide resolved
@@ -347,6 +359,10 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc | |||
this.protocol = protocol; | |||
if (protocol == H1 || protocol == H1C) { | |||
final Http1ResponseDecoder responseDecoder = ctx.pipeline().get(Http1ResponseDecoder.class); | |||
final boolean webSocket = serializationFormat == SerializationFormat.WS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we could completely replace Http1ResponseDecoder
. The current implementation looks good.
This PR is already huge. If necessary, additional modularization could be taken in the future.
return; | ||
} | ||
// Do not close the response right away to give a chance to WebSocketFrameDecoder to close the | ||
// response normally if it receives a close frame. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember we had a chat about this behavior before. If we close the response instead of abortion, we don't need the delay, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't remember the conclusion of our discussion either 😅
Did we decide to close the connection from server-side? or did we decide to strictly follow the websocket rfc and close from client-side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to close the delegate directly. 😉
Did we decide to close the connection from server-side? or did we decide to strictly follow the websocket rfc and close from client-side?
Because we can't control and guarantee that, we just close the connection after close frames are sent and received in both sides. 😉
super(delegate, eventLoop, ctx, responseTimeoutMillis, maxContentLength); | ||
final ClientRequestContextExtension extension = ctx.as(ClientRequestContextExtension.class); | ||
if (extension != null) { | ||
extension.setClosingResponseTask(cause -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, this is a setter method that sets a Consumer
to ctx.
For those usage, I think we may instead use attributes to store the Consumer
.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in the future, we may also incorporate websocket ping/pong behavior to KeepAliveHandler
which may require a more extensive callback handler.
At the moment since the callback isn't exposed, I'm fine with either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use attribute. BTW, I didn't change it for the server side because the original request might be used by others. Please, let me know if you think I need to change it as well. 😉
core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/SerializationFormat.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good! Got through roughly 2/3 😅 Will take a look at the rest tomorrow
|
||
@Override | ||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | ||
keepAliveHandler.onReadOrWrite(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: KeepAliveHandler
at the moment won't support ping/pong
return; | ||
} | ||
// Do not close the response right away to give a chance to WebSocketFrameDecoder to close the | ||
// response normally if it receives a close frame. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't remember the conclusion of our discussion either 😅
Did we decide to close the connection from server-side? or did we decide to strictly follow the websocket rfc and close from client-side?
@@ -65,6 +65,9 @@ public enum SessionProtocol { | |||
|
|||
private static final Set<SessionProtocol> HTTPS_VALUES = Sets.immutableEnumSet(HTTPS, H1, H2); | |||
|
|||
private static final Set<SessionProtocol> HTTP_AND_HTTPS_VALUES = | |||
Sets.immutableEnumSet(HTTPS, HTTP, H1, H1C, H2, H2C); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sets.immutableEnumSet(HTTPS, HTTP, H1, H1C, H2, H2C); | |
Sets.union(HTTP_VALUES, HTTPS_VALUES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sets.union
has unnecessary operations inside its method. For example, it checks if each set contains the element from the other set in size()
method:
public int size() {
int size = set1.size();
for (E e : set2) {
if (!set1.contains(e)) {
size++;
}
}
return size;
}
So let me just use the current logic. 😉
} else if (SessionProtocol.HTTPS == p) { | ||
schemes.put("wss", scheme); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question) Why can't this be outside of the double for loop?
schemes.put("ws", new Scheme(SerializationFormat.WS, SessionProtocol.HTTP));
schemes.put("wss", new Scheme(SerializationFormat.WS, SessionProtocol.HTTPS));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That will create two new Scheme(SerializationFormat.WS, SessionProtocol.HTTP)
instances whose references are different and it wouldn't work if we use ==
instead of scheme.equals(others)
.
if (internalHeaders.isEmpty()) { | ||
return headers; | ||
} | ||
if (internalHeaders.size() == 1 && internalHeaders.contains(HttpHeaderNames.USER_AGENT) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like extra overhead since I think most requests will contain
- internalHeaders.authority
- internalHeaders.scheme
What was the motivation of this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct! 👍 Let me just revert it and leave the following if condition. 😉
core/src/main/java/com/linecorp/armeria/client/websocket/WebSocketClientHandshakeException.java
Show resolved
Hide resolved
|
||
final class DefaultWebSocketClient implements WebSocketClient { | ||
|
||
static final WebSocketClient DEFAULT = new DefaultWebSocketClient(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we just use the builder?
static final WebSocketClient DEFAULT = new DefaultWebSocketClient(); | |
static final WebSocketClient DEFAULT = new WebSocketClientBuilder(UNDEFINED_URI).build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should've pushed my change before you left this comment. 🤣
@@ -438,7 +440,7 @@ private void failEarly(Throwable cause) { | |||
} | |||
|
|||
// TODO(ikhoon): Consider moving the logic for filling authority to `HttpClientDelegate.exceute()`. | |||
private void autoFillSchemeAndAuthority() { | |||
private void autoFillSchemeAuthorityAndOrigin() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While double checking, it seems like the rfc specifies origin as optional.
- Optionally, an |Origin| header field. This header field is sent
by all browser clients. A connection attempt lacking this
header field SHOULD NOT be interpreted as coming from a browser
client.
I'm fine with this change if this is important for compatibility with other server implementations, but I wanted to double-check if we want to add this header by default.
ref: https://datatracker.ietf.org/doc/html/rfc6455#section-4.2.1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we have decided to reject a WebSocket request if the origin header is missing from the WebSocketService
, I think we should send the origin header by default for the client. 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a couple of comments but mostly looks good 👍
core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java
Outdated
Show resolved
Hide resolved
}); | ||
|
||
final CompletableFuture<WebSocketSession> result = new CompletableFuture<>(); | ||
split.headers().handle((responseHeaders, cause) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question) If endpoint selection fails and HttpResponse.ofFailure(cause)
is returned, is it possible that headersFuture
is never triggered and thus result
is never completed?
I actually think even actualSessionProtocol(ctx)
can fail if ctx
is never created.
return abortRequestAndReturnFailureResponse(req, new IllegalArgumentException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If HttpResponse.ofFailure(cause)
is returned, then
SplitHttpMessageSubscriber.onError()
is called:armeria/core/src/main/java/com/linecorp/armeria/internal/common/SplitHttpMessageSubscriber.java
Line 259 in f0818b9
doOnCompletion(cause); DefaultSplitHttpResponse.doOnCompletion
is called:armeria/core/src/main/java/com/linecorp/armeria/internal/common/DefaultSplitHttpResponse.java
Line 102 in 9301b1c
protected void doOnCompletion(@Nullable Throwable cause) {
Thus, headersFuture
is complete exceptionally. Did you mean something else?
I actually think even actualSessionProtocol(ctx) can fail if ctx is never created.
actualSessionProtocol(ctx)
is invoked only when the client receives the ResponseHeaders
from the server. if ctx
is never created and HttpResponse.ofFailure(cause);
is returned, split.headers().handle((responseHeaders, cause) -> {...}
will be called directly with the cause.
new DefaultClientRequestContext(this, id, req, rpcReq, endpoint, null, | ||
sessionProtocol(), newHeaders.method(), reqTarget); | ||
// TODO(minwoox): Consider adding serizalizationFormat to ClientRequestContext constructor. | ||
ctx.logBuilder().serializationFormat(log().partial().serializationFormat()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this already being done?
logBuilder.serializationFormat(partial.serializationFormat()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, let me revert it. 😓 Thanks!
core/src/main/java/com/linecorp/armeria/client/websocket/DefaultWebSocketClient.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/websocket/WebSocketClient.java
Show resolved
Hide resolved
SerializationFormat serializationFormat(); | ||
|
||
/** | ||
* Returns the explicit {@link SessionProtocol} of this {@link HttpSession} uses. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Returns the explicit {@link SessionProtocol} of this {@link HttpSession} uses. | |
* Returns the explicit {@link SessionProtocol} of this {@link HttpSession}. |
super(delegate, eventLoop, ctx, responseTimeoutMillis, maxContentLength); | ||
final ClientRequestContextExtension extension = ctx.as(ClientRequestContextExtension.class); | ||
if (extension != null) { | ||
extension.setClosingResponseTask(cause -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in the future, we may also incorporate websocket ping/pong behavior to KeepAliveHandler
which may require a more extensive callback handler.
At the moment since the callback isn't exposed, I'm fine with either way.
/** | ||
* Sets the {@link Consumer} which will be invoked when the response is closed. | ||
*/ | ||
void setClosingResponseTask(Consumer<@Nullable Throwable> closingResponseTask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of adding the word websocket
in the naming so that we don't have to look up what this does every time?
i.e. webSocketResponseCloseTask
or onWebSocketCloseFrame
/** | ||
* Sets the {@link WebSocket} which is used to send WebSocket frames to the server. | ||
*/ | ||
public boolean setOutbound(Publisher<? extends WebSocketFrame> outbound) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we use set
in our setters instead of the record style?
public boolean setOutbound(Publisher<? extends WebSocketFrame> outbound) { | |
public boolean outbound(Publisher<? extends WebSocketFrame> outbound) { |
Also, do you think it would be confusing to add a shortcut in case users just want to write directly?
val writer = WebsocketSession.outbound() // creates a writer internally and sets to `outboundFuture`
writer.write("hi")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we use set in our setters instead of the record style?
I believe so and this is one of examples:
armeria/core/src/main/java/com/linecorp/armeria/client/ClientRequestContext.java
Lines 307 to 323 in 7da39b8
/** | |
* Returns the amount of time allowed until the initial write attempt of the current {@link Request} | |
* succeeds. This value is initially set from {@link ClientOptions#WRITE_TIMEOUT_MILLIS}. | |
*/ | |
long writeTimeoutMillis(); | |
/** | |
* Sets the amount of time allowed until the initial write attempt of the current {@link Request} | |
* succeeds. This value is initially set from {@link ClientOptions#WRITE_TIMEOUT_MILLIS}. | |
*/ | |
void setWriteTimeoutMillis(long writeTimeoutMillis); | |
/** | |
* Sets the amount of time allowed until the initial write attempt of the current {@link Request} | |
* succeeds. This value is initially set from {@link ClientOptions#WRITE_TIMEOUT_MILLIS}. | |
*/ | |
void setWriteTimeout(Duration writeTimeout); |
, do you think it would be confusing to add a shortcut in case users just want to write directly?
That's a good suggestion. 😉
|
||
/** | ||
* A WebSocket session that is created after {@link WebSocketClient#connect(String)} succeeds. | ||
* You can start sending {@link WebSocketFrame}s via {@link #setOutbound(WebSocket)}. You can also subscribe to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* You can start sending {@link WebSocketFrame}s via {@link #setOutbound(WebSocket)}. You can also subscribe to | |
* You can start sending {@link WebSocketFrame}s via {@link #setOutbound(Publisher)}. You can also subscribe to |
return; | ||
} | ||
// Do not close the response right away to give a chance to WebSocketFrameDecoder to close the | ||
// response normally if it receives a close frame. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been staring at this piece of code for 30min just to remember why ClosedSessionException
was excluded 😅
// response normally if it receives a close frame. | |
// response normally if it receives a close frame instead of completing | |
// exceptionally due to a ClosedSessionException. |
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## main #4972 +/- ##
============================================
- Coverage 74.30% 74.21% -0.10%
- Complexity 19600 19801 +201
============================================
Files 1682 1698 +16
Lines 72260 73091 +831
Branches 9242 9357 +115
============================================
+ Hits 53695 54241 +546
- Misses 14222 14417 +195
- Partials 4343 4433 +90
☔ View full report in Codecov by Sentry. |
|
||
import io.netty.channel.Channel; | ||
|
||
abstract class AbstractHttpRequestSubscriber extends AbstractHttpRequestHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic in this class isn't changed.
HttpRequestSubscriber
is renamed to this AbstractHttpRequestSubscriber
class and the new HttpRequestSubscriber
extends this class.
import io.netty.util.collection.IntObjectHashMap; | ||
import io.netty.util.collection.IntObjectMap; | ||
|
||
abstract class AbstractHttpResponseDecoder implements HttpResponseDecoder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HttpResponseDecoder
now becomes AbstractHttpResponseDecoder
and the new interface HttpResponseDecoder
is added for WebSocketHttp1ClientChannelHandler
.
super(channel, InboundTrafficController.ofHttp1(channel)); | ||
final long idleTimeoutMillis = clientFactory.idleTimeoutMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the logic for creating keepAliveHandler
into this constructor and keepAliveHandler
is not nullable anymore.
pool.acquireLater(protocol, key, timingsBuilder).handle((newPooledChannel, cause) -> { | ||
logSession(ctx, newPooledChannel, timingsBuilder.build()); | ||
if (cause == null) { | ||
doExecute(newPooledChannel, ctx, req, res); | ||
} else { | ||
final UnprocessedRequestException wrapped = UnprocessedRequestException.of(cause); | ||
handleEarlyRequestException(ctx, req, wrapped); | ||
res.close(wrapped); | ||
} | ||
return null; | ||
}); | ||
pool.acquireLater(protocol, serializationFormat, key, timingsBuilder) | ||
.handle((newPooledChannel, cause) -> { | ||
logSession(ctx, newPooledChannel, timingsBuilder.build()); | ||
if (cause == null) { | ||
doExecute(newPooledChannel, ctx, req, res); | ||
} else { | ||
final UnprocessedRequestException wrapped = UnprocessedRequestException.of(cause); | ||
handleEarlyRequestException(ctx, req, wrapped); | ||
res.close(wrapped); | ||
} | ||
return null; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding serializationFormat
to acquireLater
method is the only change.
import io.netty.channel.EventLoop; | ||
import io.netty.util.concurrent.EventExecutor; | ||
|
||
class HttpResponseWrapper implements StreamWriter<HttpObject> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class was in HttpResponseDecoder
.
close
method is a bit changed to support WebSocket.
import io.netty.handler.codec.http.HttpUtil; | ||
import io.netty.util.ReferenceCountUtil; | ||
|
||
final class WebSocketHttp1ClientChannelHandler extends ChannelDuplexHandler implements HttpResponseDecoder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is used instead of Http1ResponseDecoder
for WebSocket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional) WebSocketHttp1ClientChannelHandler
could be split into WebSocketHttp1UpgradleHandler
and WebSocketHttp1ResponseHandler
to abide by SRP in future work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it's okay in terms of handling the series of WebSocket responses. Will consider splitting it once the logic becomes complicated. 😉
core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestSubscriber.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/AbstractWebClientBuilder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/Http1ResponseDecoder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/HttpSessionHandler.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/HttpSessionHandler.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ClientChannelHandler.java
Outdated
Show resolved
Hide resolved
core/src/test/java/com/linecorp/armeria/client/Http1ResponseDecoderTest.java
Outdated
Show resolved
Hide resolved
a0684e3
to
2c690e8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ClientChannelHandler.java
Outdated
Show resolved
Hide resolved
} | ||
if (!res.tryWriteData(HttpData.wrap(data.retain()))) { | ||
ctx.close(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing break
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. 😉
if (serializationFormat == SerializationFormat.WS) { | ||
responseDecoder = ctx.pipeline().get(WebSocketHttp1ClientChannelHandler.class); | ||
keepAliveHandler = responseDecoder.keepAliveHandler(); | ||
} else { | ||
keepAliveHandler = new NoopKeepAliveHandler(); | ||
final Http1ResponseDecoder http1ResponseDecoder = | ||
ctx.pipeline().get(Http1ResponseDecoder.class); | ||
http1ResponseDecoder.maybeInitializeKeepAliveHandler(ctx); | ||
responseDecoder = http1ResponseDecoder; | ||
keepAliveHandler = http1ResponseDecoder.keepAliveHandler(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q) Could we generalize this? I see the only difference is maybeInitializeKeepAliveHandler(ctx)
which might do nothing in WebSocketHttp1ClientChannelHandler
.
final HttpResponseDecoder responseDecoder =
ctx.pipeline().get(HttpResponseDecoder.class);
// Noop for WebSocketHttp1ClientChannelHandler
responseDecoder.maybeInitializeKeepAliveHandler(ctx);
keepAliveHandler = responseDecoder.keepAliveHandler();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to call keepAliveHandler.initialize(ctx);
directly. 😉
final AbstractHttpRequestSubscriber subscriber; | ||
if (serializationFormat == SerializationFormat.WS) { | ||
if (protocol.isExplicitHttp1()) { | ||
subscriber = new WebSocketHttp1RequestSubscriber( | ||
channel, requestEncoder, responseDecoder, req, res, ctx, writeTimeoutMillis); | ||
} else { | ||
assert protocol.isExplicitHttp2(); | ||
subscriber = new WebSocketHttp2RequestSubscriber( | ||
channel, requestEncoder, responseDecoder, req, res, ctx, writeTimeoutMillis); | ||
} | ||
} else { | ||
subscriber = new HttpRequestSubscriber( | ||
channel, requestEncoder, responseDecoder, req, res, ctx, writeTimeoutMillis); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: What do you think of adding a factory method to AbstractHttpRequestSubscriber
to encapsulate the dispatch?
AbstractHttpRequestSubscriber subscriber =
AbstractHttpRequestSubscriber.of(channel, requestEncoder, responseDecoder, req, res, ctx, writeTimeoutMillis);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice abstraction. 👍
final StringBuilder buf = tempThreadLocals.stringBuilder(); | ||
buf.append("unexpected message type: " + msg.getClass().getName() + | ||
" (expected: " + HttpResponse.class.getName() + ", channel: " + ctx.channel() + ')'); | ||
fail(ctx, new ProtocolViolationException(buf.toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call fail()
after the try-with-resource block because res.close(cause)
in fail()
can invoke callbacks? It is difficult to statically analyze what will be called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. 👍
Also, fix it in Http1ResponseDecoder
.
@@ -91,7 +104,7 @@ public final class SerializationFormat implements Comparable<SerializationFormat | |||
private static SerializationFormat register( | |||
BiMap<String, SerializationFormat> uriTextToFormats, | |||
Multimap<MediaType, SerializationFormat> simplifiedMediaTypeToFormats, | |||
SerializationFormatProvider.Entry entry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert?
return !session().isAcquirable() && !hasUnfinishedResponses(); | ||
} | ||
|
||
static Exception contentTooLargeException(HttpResponseWrapper res, long transferred) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
static Exception contentTooLargeException(HttpResponseWrapper res, long transferred) { | |
static ContentTooLargeException contentTooLargeException(HttpResponseWrapper res, | |
long transferred) { |
if (requestAutoAbortDelayMillis == 0) { | ||
request.abort(ResponseCompleteException.get()); | ||
return; | ||
} | ||
if (requestAutoAbortDelayMillis > 0 && | ||
requestAutoAbortDelayMillis < Long.MAX_VALUE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Would be clear to use a single if...else
chain since it only sees requestAutoAbortDelayMillis
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't do anything when the value is negative or MAX_VALUE
. Let me use the early-return style. 😉
import io.netty.handler.codec.http.HttpUtil; | ||
import io.netty.util.ReferenceCountUtil; | ||
|
||
final class WebSocketHttp1ClientChannelHandler extends ChannelDuplexHandler implements HttpResponseDecoder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional) WebSocketHttp1ClientChannelHandler
could be split into WebSocketHttp1UpgradleHandler
and WebSocketHttp1ResponseHandler
to abide by SRP in future work.
@@ -92,6 +92,19 @@ public static RequestHeaders mergeRequestHeaders(RequestHeaders headers, | |||
headers.contains(HttpHeaderNames.USER_AGENT)) { | |||
return headers; | |||
} | |||
if (defaultHeaders.isEmpty() && additionalHeaders.isEmpty()) { | |||
boolean containInternalHeaders = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
boolean containInternalHeaders = true; | |
boolean containsAllInternalHeaders = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @minwoox 🚀🚀
public void setOutbound(Publisher<? extends WebSocketFrame> outbound) { | ||
requireNonNull(outbound, "outbound"); | ||
if (outboundFuture.isDone()) { | ||
throw new IllegalStateException("outbound() or setOutbound() has been already called."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call outbound.abort()
if outbound
is an instance of StreamMessage
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we need it. Thanks! 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 👍 👍
Motivation
It would be nice if we also support WebSocket clients.
Modifications
SerializationFormat.WS
for WebSocket.WebSocketClient
and its builder.HttpRequestSubciber
toAbsractHttpRequestSubciber
and add
WebSocketHttp1RequestSubscriber
ClientOpions.AUTO_FILL_ORIGIN_HEADER
for adding the header automatically.Result
WebSocketClient
.To-Do
WebSocketClientEventHandler