Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WebSocketClient #4972

Merged
merged 31 commits into from
Aug 22, 2023
Merged

Add WebSocketClient #4972

merged 31 commits into from
Aug 22, 2023

Conversation

minwoox
Copy link
Contributor

@minwoox minwoox commented Jun 20, 2023

Motivation
It would be nice if we also support WebSocket clients.

Modifications

  • Add SerializationFormat.WS for WebSocket.
  • Add WebSocketClient and its builder.
  • Extract the common part of HttpRequestSubciber to AbsractHttpRequestSubciber
    and add WebSocketHttp1RequestSubscriber
  • Add ClientOpions.AUTO_FILL_ORIGIN_HEADER for adding the header automatically.
  • Add the pipeline Channel handler for WebSocket

Result

  • You can now send and receive WebSocket frames using WebSocketClient.

To-Do

  • Add WebSocketClientEventHandler
  • A lot of todos that are in this PR

@minwoox minwoox added this to the 1.25.0 milestone Jun 20, 2023
@minwoox minwoox changed the title [WIP] Add WebSocketClient Add WebSocketClient Jun 30, 2023
@minwoox minwoox mentioned this pull request Jun 30, 2023
RequestHeaders headers = request.headers();
final HttpSession session = HttpSession.get(channel());
final SerializationFormat serializationFormat = session.serializationFormat();
if (serializationFormat == SerializationFormat.WS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HttpSession is determined before starting subscription.We may add WebSocketHttp2RequestSubscriber and move this specialized logic there?

import com.linecorp.armeria.common.RequestHeaders;

class WebSocketHttp2RequestSubscriber {
    RequestHeaders mapHeaders(RequestHeaders headers) {
        headers
                .toBuilder()
                .method(HttpMethod.CONNECT)
                ...
    }
}

@@ -347,6 +359,10 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
this.protocol = protocol;
if (protocol == H1 || protocol == H1C) {
final Http1ResponseDecoder responseDecoder = ctx.pipeline().get(Http1ResponseDecoder.class);
final boolean webSocket = serializationFormat == SerializationFormat.WS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Would it be possible to replace Http1ResponseDecoder with WebSocketResponseDecoder to modulize the WebSocket domain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split it. PTAL. 😉
a94f576

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we could completely replace Http1ResponseDecoder. The current implementation looks good.

This PR is already huge. If necessary, additional modularization could be taken in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to use WebSocketHttp1ClientChannelHandler instead. PTAL. 😉

import com.linecorp.armeria.server.websocket.WebSocketServiceHandler;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;

class WebSocketClientTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also test against Jetty's WebSocket server? We could add it to :it:websocket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have caught my laziness. 🤣

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤣

@@ -347,6 +359,10 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
this.protocol = protocol;
if (protocol == H1 || protocol == H1C) {
final Http1ResponseDecoder responseDecoder = ctx.pipeline().get(Http1ResponseDecoder.class);
final boolean webSocket = serializationFormat == SerializationFormat.WS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we could completely replace Http1ResponseDecoder. The current implementation looks good.

This PR is already huge. If necessary, additional modularization could be taken in the future.

return;
}
// Do not close the response right away to give a chance to WebSocketFrameDecoder to close the
// response normally if it receives a close frame.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember we had a chat about this behavior before. If we close the response instead of abortion, we don't need the delay, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember the conclusion of our discussion either 😅
Did we decide to close the connection from server-side? or did we decide to strictly follow the websocket rfc and close from client-side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to close the delegate directly. 😉

Did we decide to close the connection from server-side? or did we decide to strictly follow the websocket rfc and close from client-side?

Because we can't control and guarantee that, we just close the connection after close frames are sent and received in both sides. 😉

super(delegate, eventLoop, ctx, responseTimeoutMillis, maxContentLength);
final ClientRequestContextExtension extension = ctx.as(ClientRequestContextExtension.class);
if (extension != null) {
extension.setClosingResponseTask(cause -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, this is a setter method that sets a Consumer to ctx.
For those usage, I think we may instead use attributes to store the Consumer.
What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the future, we may also incorporate websocket ping/pong behavior to KeepAliveHandler which may require a more extensive callback handler.

At the moment since the callback isn't exposed, I'm fine with either way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to use attribute. BTW, I didn't change it for the server side because the original request might be used by others. Please, let me know if you think I need to change it as well. 😉

Copy link
Contributor

@jrhee17 jrhee17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good! Got through roughly 2/3 😅 Will take a look at the rest tomorrow


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
keepAliveHandler.onReadOrWrite();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: KeepAliveHandler at the moment won't support ping/pong

return;
}
// Do not close the response right away to give a chance to WebSocketFrameDecoder to close the
// response normally if it receives a close frame.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember the conclusion of our discussion either 😅
Did we decide to close the connection from server-side? or did we decide to strictly follow the websocket rfc and close from client-side?

@@ -65,6 +65,9 @@ public enum SessionProtocol {

private static final Set<SessionProtocol> HTTPS_VALUES = Sets.immutableEnumSet(HTTPS, H1, H2);

private static final Set<SessionProtocol> HTTP_AND_HTTPS_VALUES =
Sets.immutableEnumSet(HTTPS, HTTP, H1, H1C, H2, H2C);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Sets.immutableEnumSet(HTTPS, HTTP, H1, H1C, H2, H2C);
Sets.union(HTTP_VALUES, HTTPS_VALUES);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sets.union has unnecessary operations inside its method. For example, it checks if each set contains the element from the other set in size() method:

public int size() {
  int size = set1.size();
  for (E e : set2) {
    if (!set1.contains(e)) {
      size++;
    }
  }
  return size;
}

So let me just use the current logic. 😉

} else if (SessionProtocol.HTTPS == p) {
schemes.put("wss", scheme);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question) Why can't this be outside of the double for loop?

schemes.put("ws", new Scheme(SerializationFormat.WS, SessionProtocol.HTTP));
schemes.put("wss", new Scheme(SerializationFormat.WS, SessionProtocol.HTTPS));

Copy link
Contributor Author

@minwoox minwoox Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will create two new Scheme(SerializationFormat.WS, SessionProtocol.HTTP) instances whose references are different and it wouldn't work if we use == instead of scheme.equals(others).

if (internalHeaders.isEmpty()) {
return headers;
}
if (internalHeaders.size() == 1 && internalHeaders.contains(HttpHeaderNames.USER_AGENT) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like extra overhead since I think most requests will contain

  • internalHeaders.authority
  • internalHeaders.scheme

What was the motivation of this change?

Copy link
Contributor Author

@minwoox minwoox Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct! 👍 Let me just revert it and leave the following if condition. 😉


final class DefaultWebSocketClient implements WebSocketClient {

static final WebSocketClient DEFAULT = new DefaultWebSocketClient();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just use the builder?

Suggested change
static final WebSocketClient DEFAULT = new DefaultWebSocketClient();
static final WebSocketClient DEFAULT = new WebSocketClientBuilder(UNDEFINED_URI).build()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should've pushed my change before you left this comment. 🤣

@@ -438,7 +440,7 @@ private void failEarly(Throwable cause) {
}

// TODO(ikhoon): Consider moving the logic for filling authority to `HttpClientDelegate.exceute()`.
private void autoFillSchemeAndAuthority() {
private void autoFillSchemeAuthorityAndOrigin() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While double checking, it seems like the rfc specifies origin as optional.

  1. Optionally, an |Origin| header field. This header field is sent
    by all browser clients. A connection attempt lacking this
    header field SHOULD NOT be interpreted as coming from a browser
    client.

I'm fine with this change if this is important for compatibility with other server implementations, but I wanted to double-check if we want to add this header by default.

ref: https://datatracker.ietf.org/doc/html/rfc6455#section-4.2.1

Copy link
Contributor Author

@minwoox minwoox Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we have decided to reject a WebSocket request if the origin header is missing from the WebSocketService, I think we should send the origin header by default for the client. 😉

Copy link
Contributor

@jrhee17 jrhee17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple of comments but mostly looks good 👍

});

final CompletableFuture<WebSocketSession> result = new CompletableFuture<>();
split.headers().handle((responseHeaders, cause) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question) If endpoint selection fails and HttpResponse.ofFailure(cause) is returned, is it possible that headersFuture is never triggered and thus result is never completed?

I actually think even actualSessionProtocol(ctx) can fail if ctx is never created.

return abortRequestAndReturnFailureResponse(req, new IllegalArgumentException(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If HttpResponse.ofFailure(cause) is returned, then

Thus, headersFuture is complete exceptionally. Did you mean something else?

I actually think even actualSessionProtocol(ctx) can fail if ctx is never created.

actualSessionProtocol(ctx) is invoked only when the client receives the ResponseHeaders from the server. if ctx is never created and HttpResponse.ofFailure(cause); is returned, split.headers().handle((responseHeaders, cause) -> {...} will be called directly with the cause.

new DefaultClientRequestContext(this, id, req, rpcReq, endpoint, null,
sessionProtocol(), newHeaders.method(), reqTarget);
// TODO(minwoox): Consider adding serizalizationFormat to ClientRequestContext constructor.
ctx.logBuilder().serializationFormat(log().partial().serializationFormat());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this already being done?

logBuilder.serializationFormat(partial.serializationFormat());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, let me revert it. 😓 Thanks!

SerializationFormat serializationFormat();

/**
* Returns the explicit {@link SessionProtocol} of this {@link HttpSession} uses.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Returns the explicit {@link SessionProtocol} of this {@link HttpSession} uses.
* Returns the explicit {@link SessionProtocol} of this {@link HttpSession}.

super(delegate, eventLoop, ctx, responseTimeoutMillis, maxContentLength);
final ClientRequestContextExtension extension = ctx.as(ClientRequestContextExtension.class);
if (extension != null) {
extension.setClosingResponseTask(cause -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the future, we may also incorporate websocket ping/pong behavior to KeepAliveHandler which may require a more extensive callback handler.

At the moment since the callback isn't exposed, I'm fine with either way.

/**
* Sets the {@link Consumer} which will be invoked when the response is closed.
*/
void setClosingResponseTask(Consumer<@Nullable Throwable> closingResponseTask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of adding the word websocket in the naming so that we don't have to look up what this does every time?

i.e. webSocketResponseCloseTask or onWebSocketCloseFrame

/**
* Sets the {@link WebSocket} which is used to send WebSocket frames to the server.
*/
public boolean setOutbound(Publisher<? extends WebSocketFrame> outbound) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use set in our setters instead of the record style?

Suggested change
public boolean setOutbound(Publisher<? extends WebSocketFrame> outbound) {
public boolean outbound(Publisher<? extends WebSocketFrame> outbound) {

Also, do you think it would be confusing to add a shortcut in case users just want to write directly?

val writer = WebsocketSession.outbound() // creates a writer internally and sets to `outboundFuture`
writer.write("hi")

Copy link
Contributor Author

@minwoox minwoox Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use set in our setters instead of the record style?

I believe so and this is one of examples:

/**
* Returns the amount of time allowed until the initial write attempt of the current {@link Request}
* succeeds. This value is initially set from {@link ClientOptions#WRITE_TIMEOUT_MILLIS}.
*/
long writeTimeoutMillis();
/**
* Sets the amount of time allowed until the initial write attempt of the current {@link Request}
* succeeds. This value is initially set from {@link ClientOptions#WRITE_TIMEOUT_MILLIS}.
*/
void setWriteTimeoutMillis(long writeTimeoutMillis);
/**
* Sets the amount of time allowed until the initial write attempt of the current {@link Request}
* succeeds. This value is initially set from {@link ClientOptions#WRITE_TIMEOUT_MILLIS}.
*/
void setWriteTimeout(Duration writeTimeout);
😉

, do you think it would be confusing to add a shortcut in case users just want to write directly?

That's a good suggestion. 😉


/**
* A WebSocket session that is created after {@link WebSocketClient#connect(String)} succeeds.
* You can start sending {@link WebSocketFrame}s via {@link #setOutbound(WebSocket)}. You can also subscribe to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* You can start sending {@link WebSocketFrame}s via {@link #setOutbound(WebSocket)}. You can also subscribe to
* You can start sending {@link WebSocketFrame}s via {@link #setOutbound(Publisher)}. You can also subscribe to

return;
}
// Do not close the response right away to give a chance to WebSocketFrameDecoder to close the
// response normally if it receives a close frame.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been staring at this piece of code for 30min just to remember why ClosedSessionException was excluded 😅

Suggested change
// response normally if it receives a close frame.
// response normally if it receives a close frame instead of completing
// exceptionally due to a ClosedSessionException.

@codecov
Copy link

codecov bot commented Aug 10, 2023

Codecov Report

Patch coverage: 70.18% and project coverage change: -0.10% ⚠️

Comparison is base (1ce4c69) 74.30% compared to head (ec847a4) 74.21%.
Report is 4 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #4972      +/-   ##
============================================
- Coverage     74.30%   74.21%   -0.10%     
- Complexity    19600    19801     +201     
============================================
  Files          1682     1698      +16     
  Lines         72260    73091     +831     
  Branches       9242     9357     +115     
============================================
+ Hits          53695    54241     +546     
- Misses        14222    14417     +195     
- Partials       4343     4433      +90     
Files Changed Coverage Δ
...ia/client/BlockingWebClientRequestPreparation.java 38.61% <0.00%> (-0.79%) ⬇️
...a/client/FutureTransformingRequestPreparation.java 29.68% <0.00%> (-0.48%) ⬇️
.../linecorp/armeria/client/Http2ResponseDecoder.java 72.46% <ø> (ø)
...m/linecorp/armeria/client/HttpResponseDecoder.java 0.00% <ø> (-79.42%) ⬇️
...linecorp/armeria/client/RestClientPreparation.java 38.63% <0.00%> (-0.90%) ⬇️
...armeria/client/TransformingRequestPreparation.java 16.16% <0.00%> (-0.34%) ⬇️
...rp/armeria/client/WebClientRequestPreparation.java 72.47% <0.00%> (-0.68%) ⬇️
...t/websocket/WebSocketClientHandshakeException.java 0.00% <0.00%> (ø)
...orp/armeria/common/AbstractHttpMessageBuilder.java 73.68% <0.00%> (-5.57%) ⬇️
...om/linecorp/armeria/common/HttpRequestBuilder.java 87.50% <0.00%> (-2.83%) ⬇️
... and 51 more

... and 29 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


import io.netty.channel.Channel;

abstract class AbstractHttpRequestSubscriber extends AbstractHttpRequestHandler
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in this class isn't changed.
HttpRequestSubscriber is renamed to this AbstractHttpRequestSubscriber class and the new HttpRequestSubscriber extends this class.

import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;

abstract class AbstractHttpResponseDecoder implements HttpResponseDecoder {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HttpResponseDecoder now becomes AbstractHttpResponseDecoder and the new interface HttpResponseDecoder is added for WebSocketHttp1ClientChannelHandler.

super(channel, InboundTrafficController.ofHttp1(channel));
final long idleTimeoutMillis = clientFactory.idleTimeoutMillis();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the logic for creating keepAliveHandler into this constructor and keepAliveHandler is not nullable anymore.

Comment on lines -178 to +191
pool.acquireLater(protocol, key, timingsBuilder).handle((newPooledChannel, cause) -> {
logSession(ctx, newPooledChannel, timingsBuilder.build());
if (cause == null) {
doExecute(newPooledChannel, ctx, req, res);
} else {
final UnprocessedRequestException wrapped = UnprocessedRequestException.of(cause);
handleEarlyRequestException(ctx, req, wrapped);
res.close(wrapped);
}
return null;
});
pool.acquireLater(protocol, serializationFormat, key, timingsBuilder)
.handle((newPooledChannel, cause) -> {
logSession(ctx, newPooledChannel, timingsBuilder.build());
if (cause == null) {
doExecute(newPooledChannel, ctx, req, res);
} else {
final UnprocessedRequestException wrapped = UnprocessedRequestException.of(cause);
handleEarlyRequestException(ctx, req, wrapped);
res.close(wrapped);
}
return null;
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding serializationFormat to acquireLater method is the only change.

import io.netty.channel.EventLoop;
import io.netty.util.concurrent.EventExecutor;

class HttpResponseWrapper implements StreamWriter<HttpObject> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was in HttpResponseDecoder.
close method is a bit changed to support WebSocket.

import io.netty.handler.codec.http.HttpUtil;
import io.netty.util.ReferenceCountUtil;

final class WebSocketHttp1ClientChannelHandler extends ChannelDuplexHandler implements HttpResponseDecoder {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is used instead of Http1ResponseDecoder for WebSocket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional) WebSocketHttp1ClientChannelHandler could be split into WebSocketHttp1UpgradleHandler and WebSocketHttp1ResponseHandler to abide by SRP in future work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it's okay in terms of handling the series of WebSocket responses. Will consider splitting it once the logic becomes complicated. 😉

Copy link
Member

@trustin trustin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left two nits but:
NASA LGTM

}
if (!res.tryWriteData(HttpData.wrap(data.retain()))) {
ctx.close();
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing break?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. 😉

Comment on lines 361 to 369
if (serializationFormat == SerializationFormat.WS) {
responseDecoder = ctx.pipeline().get(WebSocketHttp1ClientChannelHandler.class);
keepAliveHandler = responseDecoder.keepAliveHandler();
} else {
keepAliveHandler = new NoopKeepAliveHandler();
final Http1ResponseDecoder http1ResponseDecoder =
ctx.pipeline().get(Http1ResponseDecoder.class);
http1ResponseDecoder.maybeInitializeKeepAliveHandler(ctx);
responseDecoder = http1ResponseDecoder;
keepAliveHandler = http1ResponseDecoder.keepAliveHandler();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q) Could we generalize this? I see the only difference is maybeInitializeKeepAliveHandler(ctx) which might do nothing in WebSocketHttp1ClientChannelHandler.

final HttpResponseDecoder responseDecoder =
        ctx.pipeline().get(HttpResponseDecoder.class);
// Noop for WebSocketHttp1ClientChannelHandler
responseDecoder.maybeInitializeKeepAliveHandler(ctx);
keepAliveHandler = responseDecoder.keepAliveHandler();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to call keepAliveHandler.initialize(ctx); directly. 😉

Comment on lines 225 to 238
final AbstractHttpRequestSubscriber subscriber;
if (serializationFormat == SerializationFormat.WS) {
if (protocol.isExplicitHttp1()) {
subscriber = new WebSocketHttp1RequestSubscriber(
channel, requestEncoder, responseDecoder, req, res, ctx, writeTimeoutMillis);
} else {
assert protocol.isExplicitHttp2();
subscriber = new WebSocketHttp2RequestSubscriber(
channel, requestEncoder, responseDecoder, req, res, ctx, writeTimeoutMillis);
}
} else {
subscriber = new HttpRequestSubscriber(
channel, requestEncoder, responseDecoder, req, res, ctx, writeTimeoutMillis);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What do you think of adding a factory method to AbstractHttpRequestSubscriber to encapsulate the dispatch?

AbstractHttpRequestSubscriber subscriber =
    AbstractHttpRequestSubscriber.of(channel, requestEncoder, responseDecoder, req, res, ctx, writeTimeoutMillis);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice abstraction. 👍

final StringBuilder buf = tempThreadLocals.stringBuilder();
buf.append("unexpected message type: " + msg.getClass().getName() +
" (expected: " + HttpResponse.class.getName() + ", channel: " + ctx.channel() + ')');
fail(ctx, new ProtocolViolationException(buf.toString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call fail() after the try-with-resource block because res.close(cause) in fail() can invoke callbacks? It is difficult to statically analyze what will be called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. 👍
Also, fix it in Http1ResponseDecoder.

@@ -91,7 +104,7 @@ public final class SerializationFormat implements Comparable<SerializationFormat
private static SerializationFormat register(
BiMap<String, SerializationFormat> uriTextToFormats,
Multimap<MediaType, SerializationFormat> simplifiedMediaTypeToFormats,
SerializationFormatProvider.Entry entry) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert?

return !session().isAcquirable() && !hasUnfinishedResponses();
}

static Exception contentTooLargeException(HttpResponseWrapper res, long transferred) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
static Exception contentTooLargeException(HttpResponseWrapper res, long transferred) {
static ContentTooLargeException contentTooLargeException(HttpResponseWrapper res,
long transferred) {

Comment on lines 218 to 223
if (requestAutoAbortDelayMillis == 0) {
request.abort(ResponseCompleteException.get());
return;
}
if (requestAutoAbortDelayMillis > 0 &&
requestAutoAbortDelayMillis < Long.MAX_VALUE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would be clear to use a single if...else chain since it only sees requestAutoAbortDelayMillis?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't do anything when the value is negative or MAX_VALUE. Let me use the early-return style. 😉

import io.netty.handler.codec.http.HttpUtil;
import io.netty.util.ReferenceCountUtil;

final class WebSocketHttp1ClientChannelHandler extends ChannelDuplexHandler implements HttpResponseDecoder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional) WebSocketHttp1ClientChannelHandler could be split into WebSocketHttp1UpgradleHandler and WebSocketHttp1ResponseHandler to abide by SRP in future work.

@@ -92,6 +92,19 @@ public static RequestHeaders mergeRequestHeaders(RequestHeaders headers,
headers.contains(HttpHeaderNames.USER_AGENT)) {
return headers;
}
if (defaultHeaders.isEmpty() && additionalHeaders.isEmpty()) {
boolean containInternalHeaders = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
boolean containInternalHeaders = true;
boolean containsAllInternalHeaders = true;

Copy link
Contributor

@ikhoon ikhoon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @minwoox 🚀🚀

public void setOutbound(Publisher<? extends WebSocketFrame> outbound) {
requireNonNull(outbound, "outbound");
if (outboundFuture.isDone()) {
throw new IllegalStateException("outbound() or setOutbound() has been already called.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call outbound.abort() if outbound is an instance of StreamMessage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we need it. Thanks! 😉

Copy link
Contributor

@jrhee17 jrhee17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 👍 👍

@jrhee17 jrhee17 merged commit dc2bdc6 into line:main Aug 22, 2023
@minwoox minwoox deleted the WebSocketClient branch August 22, 2023 05:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants