Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for bug: https://github.com/opensearch-project/OpenSearch/issues/… #3665

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 126 additions & 12 deletions client/rest/src/main/java/org/opensearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.http.ConnectionClosedException;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.entity.HttpEntityWrapper;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
Expand Down Expand Up @@ -131,6 +132,7 @@ public class RestClient implements Closeable {
private volatile NodeTuple<List<Node>> nodeTuple;
private final WarningsHandler warningsHandler;
private final boolean compressionEnabled;
private final boolean chunkedTransferEncodingEnabled;

RestClient(
CloseableHttpAsyncClient client,
Expand All @@ -141,6 +143,20 @@ public class RestClient implements Closeable {
NodeSelector nodeSelector,
boolean strictDeprecationMode,
boolean compressionEnabled
) {
this(client, defaultHeaders, nodes, pathPrefix, failureListener, nodeSelector, strictDeprecationMode, compressionEnabled, true);
}

RestClient(
CloseableHttpAsyncClient client,
Header[] defaultHeaders,
List<Node> nodes,
String pathPrefix,
FailureListener failureListener,
NodeSelector nodeSelector,
boolean strictDeprecationMode,
boolean compressionEnabled,
boolean chunkedTransferEncodingEnabled
) {
this.client = client;
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
Expand All @@ -149,6 +165,7 @@ public class RestClient implements Closeable {
this.nodeSelector = nodeSelector;
this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE;
this.compressionEnabled = compressionEnabled;
this.chunkedTransferEncodingEnabled = chunkedTransferEncodingEnabled;
setNodes(nodes);
}

Expand Down Expand Up @@ -583,36 +600,53 @@ private static void addSuppressedException(Exception suppressedException, Except
}
}

private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity, boolean compressionEnabled) {
private static HttpRequestBase createHttpRequest(
String method,
URI uri,
HttpEntity entity,
boolean compressionEnabled,
boolean chunkedTransferEncodingEnabled
) {
switch (method.toUpperCase(Locale.ROOT)) {
case HttpDeleteWithEntity.METHOD_NAME:
return addRequestBody(new HttpDeleteWithEntity(uri), entity, compressionEnabled);
return addRequestBody(new HttpDeleteWithEntity(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
case HttpGetWithEntity.METHOD_NAME:
return addRequestBody(new HttpGetWithEntity(uri), entity, compressionEnabled);
return addRequestBody(new HttpGetWithEntity(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
case HttpHead.METHOD_NAME:
return addRequestBody(new HttpHead(uri), entity, compressionEnabled);
return addRequestBody(new HttpHead(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
case HttpOptions.METHOD_NAME:
return addRequestBody(new HttpOptions(uri), entity, compressionEnabled);
return addRequestBody(new HttpOptions(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
case HttpPatch.METHOD_NAME:
return addRequestBody(new HttpPatch(uri), entity, compressionEnabled);
return addRequestBody(new HttpPatch(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
case HttpPost.METHOD_NAME:
HttpPost httpPost = new HttpPost(uri);
addRequestBody(httpPost, entity, compressionEnabled);
addRequestBody(httpPost, entity, compressionEnabled, chunkedTransferEncodingEnabled);
return httpPost;
case HttpPut.METHOD_NAME:
return addRequestBody(new HttpPut(uri), entity, compressionEnabled);
return addRequestBody(new HttpPut(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
case HttpTrace.METHOD_NAME:
return addRequestBody(new HttpTrace(uri), entity, compressionEnabled);
return addRequestBody(new HttpTrace(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
default:
throw new UnsupportedOperationException("http method not supported: " + method);
}
}

private static HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity, boolean compressionEnabled) {
private static HttpRequestBase addRequestBody(
HttpRequestBase httpRequest,
HttpEntity entity,
boolean compressionEnabled,
boolean chunkedTransferEncodingEnabled
) {
if (entity != null) {
if (httpRequest instanceof HttpEntityEnclosingRequestBase) {
if (compressionEnabled) {
entity = new ContentCompressingEntity(entity);
if (chunkedTransferEncodingEnabled) {
entity = new ContentCompressingChunkedEntity(entity);
} else {
entity = new ContentCompressingEntity(entity);
}
} else if (chunkedTransferEncodingEnabled) {
entity = new ChunkedHttpEntity(entity);
}
((HttpEntityEnclosingRequestBase) httpRequest).setEntity(entity);
} else {
Expand Down Expand Up @@ -782,7 +816,13 @@ private class InternalRequest {
String ignoreString = params.remove("ignore");
this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod());
URI uri = buildUri(pathPrefix, request.getEndpoint(), params);
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity(), compressionEnabled);
this.httpRequest = createHttpRequest(
request.getMethod(),
uri,
request.getEntity(),
compressionEnabled,
chunkedTransferEncodingEnabled
);
this.cancellable = Cancellable.fromRequest(httpRequest);
setHeaders(httpRequest, request.getOptions().getHeaders());
setRequestConfig(httpRequest, request.getOptions().getRequestConfig());
Expand Down Expand Up @@ -932,6 +972,30 @@ private static Exception extractAndWrapCause(Exception exception) {
return new RuntimeException("error while performing request", exception);
}

/**
* A gzip compressing entity that also implements {@code getContent()}.
*/
public static class ContentCompressingChunkedEntity extends GzipCompressingEntity {

/**
* Creates a {@link ContentCompressingChunkedEntity} instance with the provided HTTP entity.
*
* @param entity the HTTP entity.
*/
public ContentCompressingChunkedEntity(HttpEntity entity) {
super(entity);
}

@Override
public InputStream getContent() throws IOException {
ByteArrayInputOutputStream out = new ByteArrayInputOutputStream(1024);
try (GZIPOutputStream gzipOut = new GZIPOutputStream(out)) {
wrappedEntity.writeTo(gzipOut);
}
return out.asInput();
}
}

/**
* A gzip compressing entity that also implements {@code getContent()}.
*/
Expand All @@ -954,6 +1018,56 @@ public InputStream getContent() throws IOException {
}
return out.asInput();
}

/**
* A gzip compressing entity doesn't work with chunked encoding with sigv4
*
* @return false
*/
@Override
public boolean isChunked() {
return false;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer always the case. A compressing entity can be both chunked or not chunked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but we require that flag to tell it's a chunked or not and set correct headers


/**
* A gzip entity requires content length in http headers
* as it doesn't work with chunked encoding for sigv4
*
* @return content length of gzip entity
*/
@Override
public long getContentLength() {
long size;
try (InputStream is = getContent()) {
size = is.readAllBytes().length;
} catch (IOException ex) {
size = -1L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there other places where we return -1 when we cannot figure out the content length. I would expect an exception, not a negative number causing all kinds of wonderful problems of doing x.size() + content.getContentLength().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that why I think @reta said to return -1.
I don't see any other place where we're doing x.size + content.getContentLength();
or returning -1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine. The interface would have been "unsigned long" if it was meant to be otherwise.

}

return size;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to get the size of the compressed entity directly without reading all the bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't see any method that can gives us that info directly

}

public static class ChunkedHttpEntity extends HttpEntityWrapper {
/**
* Creates a {@link ChunkedHttpEntity} instance with the provided HTTP entity.
*
* @param entity the HTTP entity.
*/
public ChunkedHttpEntity(HttpEntity entity) {
super(entity);
}

/**
* A chunked entity requires transfer-encoding:chunked in http headers
* which requires isChunked to be true
*
* @return true
*/
@Override
public boolean isChunked() {
return true;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public final class RestClientBuilder {
private NodeSelector nodeSelector = NodeSelector.ANY;
private boolean strictDeprecationMode = false;
private boolean compressionEnabled = false;
private boolean chunkedTransferEncodingEnabled = true;

/**
* Creates a new builder instance and sets the hosts that the client will send requests to.
Expand Down Expand Up @@ -238,6 +239,16 @@ public RestClientBuilder setCompressionEnabled(boolean compressionEnabled) {
return this;
}

/**
* Whether the REST client should use Transfer-Encoding: chunked for requests or not"
*
* @param chunkedTransferEncodingEnabled flag for enabling Transfer-Encoding: chunked
*/
public RestClientBuilder setChunkedTransferEncodingEnabled(boolean chunkedTransferEncodingEnabled) {
this.chunkedTransferEncodingEnabled = chunkedTransferEncodingEnabled;
return this;
}

/**
* Creates a new {@link RestClient} based on the provided configuration.
*/
Expand All @@ -256,7 +267,8 @@ public RestClient build() {
failureListener,
nodeSelector,
strictDeprecationMode,
compressionEnabled
compressionEnabled,
chunkedTransferEncodingEnabled
);
httpClient.start();
return restClient;
Expand Down
Loading