Skip to content

Commit

Permalink
Issue #12266 - InvocationType improvements and cleanups. (#12596)
Browse files Browse the repository at this point in the history
* This is the work for the client-side.
  Now the `InvocationType` can be specified at the `HttpClientTransport` level.
  Wired the `InvocationType` also for `Response.ContentSourceListener`, so that for applications that read response content using `Content.Source` and specify an `InvocationType` to `demand(Runnable)`, the implementation will honor it.
* Renamed HttpClient.getTransport() to getHttpClientTransport().
* Fixed FCGI parsing: onResponseHeaders() was called multiple times in case of content not yet arrived.
* Fixed race condition when notifying HTTP/2 `HeadersFrame`s.
  Before, `Stream.Listener.onHeaders()` was assuming that the headers were processed synchronously.
  Now, `HttpReceiverOverHTTP2` process them asynchronously, with a task that declares an invocation type.
  This was causing a race between the task and the code present after the call to `onHeaders()`.
* Introduced `Stream.Listener.onHeaders(Stream, HeadersFrame, Callback)` to allow asynchronous processing of `HeadersFrame`s.
* Fixed `IteratingCallback` and `HTTP2Flusher` to use `tryLock()` in `toString()` to avoid deadlocks.
* Fixed HTTP/2 serialization in HttpReceiverOverHTTP2. Fixed reset race in HTTP2Stream.
* Fixed handling of HTTP upgrade in CoreClientUpgradeRequest.
  For HTTP/2, the response may have arrived, but the exchange failed (due to the request being failed); in this case, we want to notify an UpgradeException, not a generic Exception.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet authored Jan 7, 2025
1 parent c2a4d50 commit 4939a8b
Show file tree
Hide file tree
Showing 65 changed files with 1,000 additions and 529 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.jetty.client;

import java.util.Map;
import java.util.Objects;

import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedObject;
Expand All @@ -28,6 +29,7 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp

private HttpClient client;
private ConnectionPool.Factory factory;
private InvocationType invocationType = InvocationType.BLOCKING;

protected HttpClient getHttpClient()
{
Expand Down Expand Up @@ -60,4 +62,16 @@ protected void connectFailed(Map<String, Object> context, Throwable failure)
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(failure);
}

@Override
public InvocationType getInvocationType()
{
return invocationType;
}

@Override
public void setInvocationType(InvocationType invocationType)
{
this.invocationType = Objects.requireNonNull(invocationType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,6 @@ public void onComplete(Result result)
{
HttpRequest request = (HttpRequest)result.getRequest();
ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getMediaType(), getEncoding());
if (result.getResponseFailure() != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Authentication challenge failed", result.getFailure());
forwardFailureComplete(request, result.getRequestFailure(), response, result.getResponseFailure());
return;
}

String authenticationAttribute = getAuthenticationAttribute();
HttpConversation conversation = request.getConversation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@

import org.eclipse.jetty.client.transport.HttpDestination;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.util.thread.Invocable;

/**
* {@link HttpClientTransport} represents what transport implementations should provide
* in order to plug-in a different transport for {@link HttpClient}.
* in order to plug in a different transport for {@link HttpClient}.
* <p>
* While the {@link HttpClient} APIs define the HTTP semantic (request, response, headers, etc.)
* <em>how</em> an HTTP exchange is carried over the network depends on implementations of this class.
* <p>
* The default implementation uses the HTTP protocol to carry over the network the HTTP exchange,
* but the HTTP exchange may also be carried using the FCGI protocol, the HTTP/2 protocol or,
* in future, other protocols.
* in the future, other protocols.
*/
public interface HttpClientTransport extends ClientConnectionFactory, HttpClient.Aware
public interface HttpClientTransport extends ClientConnectionFactory, HttpClient.Aware, Invocable
{
public static final String HTTP_DESTINATION_CONTEXT_KEY = "org.eclipse.jetty.client.destination";
public static final String HTTP_CONNECTION_PROMISE_CONTEXT_KEY = "org.eclipse.jetty.client.connection.promise";
String HTTP_DESTINATION_CONTEXT_KEY = "org.eclipse.jetty.client.destination";
String HTTP_CONNECTION_PROMISE_CONTEXT_KEY = "org.eclipse.jetty.client.connection.promise";

/**
* Sets the {@link HttpClient} instance on this transport.
Expand All @@ -45,15 +46,15 @@ public interface HttpClientTransport extends ClientConnectionFactory, HttpClient
* @param client the {@link HttpClient} that uses this transport.
*/
@Override
public void setHttpClient(HttpClient client);
void setHttpClient(HttpClient client);

/**
* Creates a new Origin with the given request.
*
* @param request the request that triggers the creation of the Origin
* @return an Origin that identifies a destination
*/
public Origin newOrigin(Request request);
Origin newOrigin(Request request);

/**
* Creates a new, transport-specific, {@link HttpDestination} object.
Expand All @@ -64,24 +65,67 @@ public interface HttpClientTransport extends ClientConnectionFactory, HttpClient
* @param origin the destination origin
* @return a new, transport-specific, {@link HttpDestination} object
*/
public Destination newDestination(Origin origin);
Destination newDestination(Origin origin);

/**
* Establishes a physical connection to the given {@code address}.
*
* @param address the address to connect to
* @param context the context information to establish the connection
*/
public void connect(SocketAddress address, Map<String, Object> context);
void connect(SocketAddress address, Map<String, Object> context);

/**
* @return the factory for ConnectionPool instances
*/
public ConnectionPool.Factory getConnectionPoolFactory();
ConnectionPool.Factory getConnectionPoolFactory();

/**
* Set the factory for ConnectionPool instances.
* @param factory the factory for ConnectionPool instances
*/
public void setConnectionPoolFactory(ConnectionPool.Factory factory);
void setConnectionPoolFactory(ConnectionPool.Factory factory);

/**
* @return the {@link InvocationType} associated with this {@code HttpClientTransport}.
* @see #setInvocationType(InvocationType)
*/
@Override
InvocationType getInvocationType();

/**
* <p>Sets the {@link InvocationType} associated with this {@code HttpClientTransport}.</p>
* <p>The values are typically either:
* <ul>
* <li>{@link InvocationType#BLOCKING}, to indicate that response listeners are
* executing blocking code, for example blocking network I/O, JDBC, etc.</li>
* <li>{@link InvocationType#NON_BLOCKING}, to indicate that response listeners
* are executing non-blocking code.</li>
* </ul>
* <p>By default, the value is {@link InvocationType#BLOCKING}.</p>
* <p>A response listener declared to be {@link InvocationType#BLOCKING} incurs
* in one additional context switch, where the NIO processing thread delegates
* the response processing to another thread.
* This ensures that the NIO processing thread can immediately continue with
* other NIO processing activities, if any (for example, processing another
* connection).
* This also means that processing of different connections is parallelized.</p>
* <p>{@link InvocationType#BLOCKING} must be used when you want response
* listeners to be invoked by virtual threads.</p>
* <p>On the other hand, a response listener declared to be
* {@link InvocationType#NON_BLOCKING} does not incur in the additional
* context switch, and therefore it is potentially more efficient.
* However, the processing of different connections is serialized, which
* means that the last connection will be processed only after the previous
* connections (and their respective response listeners) have been processed.</p>
* <p>A response listener declared to be {@link InvocationType#NON_BLOCKING},
* but then executing blocking code, will block the NIO processing performed
* by {@link HttpClient}'s implementation: the current connection and possibly
* other connections will not be further processed, until the blocking response
* listener returns.</p>
*
* @param invocationType the {@link InvocationType} associated with this {@code HttpClientTransport}.
* @see #getInvocationType()
*/
void setInvocationType(InvocationType invocationType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ public void onComplete(Result result)
{
Request request = result.getRequest();
Response response = result.getResponse();
if (result.getResponseFailure() == null)
redirector.redirect(request, response, null);
else
redirector.fail(request, response, result.getFailure());
redirector.redirect(request, response, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ interface AsyncContentListener extends ContentSourceListener
@Override
default void onContentSource(Response response, Content.Source contentSource)
{
Runnable demandCallback = Invocable.from(Invocable.InvocationType.NON_BLOCKING, () -> onContentSource(response, contentSource));
// Ask the InvocationType to the Content.Source, where applications
// have set it there using Content.Source.demand(Runnable).
Invocable.InvocationType invocationType = Invocable.getInvocationType(contentSource);
Runnable demandCallback = Invocable.from(invocationType, () -> onContentSource(response, contentSource));
Content.Chunk chunk = contentSource.read();
if (chunk == null)
{
Expand Down
Loading

0 comments on commit 4939a8b

Please sign in to comment.