-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix for bug: https://github.com/opensearch-project/OpenSearch/issues/… #3665
Changes from all commits
45ec5ed
d023308
65a590c
303e408
e0b3615
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ | |
import org.apache.http.ConnectionClosedException; | ||
import org.apache.http.Header; | ||
import org.apache.http.HttpEntity; | ||
import org.apache.http.entity.HttpEntityWrapper; | ||
import org.apache.http.HttpHost; | ||
import org.apache.http.HttpRequest; | ||
import org.apache.http.HttpResponse; | ||
|
@@ -131,6 +132,7 @@ public class RestClient implements Closeable { | |
private volatile NodeTuple<List<Node>> nodeTuple; | ||
private final WarningsHandler warningsHandler; | ||
private final boolean compressionEnabled; | ||
private final boolean chunkedTransferEncodingEnabled; | ||
|
||
RestClient( | ||
CloseableHttpAsyncClient client, | ||
|
@@ -141,6 +143,20 @@ public class RestClient implements Closeable { | |
NodeSelector nodeSelector, | ||
boolean strictDeprecationMode, | ||
boolean compressionEnabled | ||
) { | ||
this(client, defaultHeaders, nodes, pathPrefix, failureListener, nodeSelector, strictDeprecationMode, compressionEnabled, true); | ||
} | ||
|
||
RestClient( | ||
CloseableHttpAsyncClient client, | ||
Header[] defaultHeaders, | ||
List<Node> nodes, | ||
String pathPrefix, | ||
FailureListener failureListener, | ||
NodeSelector nodeSelector, | ||
boolean strictDeprecationMode, | ||
boolean compressionEnabled, | ||
boolean chunkedTransferEncodingEnabled | ||
) { | ||
this.client = client; | ||
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders)); | ||
|
@@ -149,6 +165,7 @@ public class RestClient implements Closeable { | |
this.nodeSelector = nodeSelector; | ||
this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE; | ||
this.compressionEnabled = compressionEnabled; | ||
this.chunkedTransferEncodingEnabled = chunkedTransferEncodingEnabled; | ||
setNodes(nodes); | ||
} | ||
|
||
|
@@ -583,36 +600,53 @@ private static void addSuppressedException(Exception suppressedException, Except | |
} | ||
} | ||
|
||
private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity, boolean compressionEnabled) { | ||
private static HttpRequestBase createHttpRequest( | ||
String method, | ||
URI uri, | ||
HttpEntity entity, | ||
boolean compressionEnabled, | ||
boolean chunkedTransferEncodingEnabled | ||
) { | ||
switch (method.toUpperCase(Locale.ROOT)) { | ||
case HttpDeleteWithEntity.METHOD_NAME: | ||
return addRequestBody(new HttpDeleteWithEntity(uri), entity, compressionEnabled); | ||
return addRequestBody(new HttpDeleteWithEntity(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); | ||
case HttpGetWithEntity.METHOD_NAME: | ||
return addRequestBody(new HttpGetWithEntity(uri), entity, compressionEnabled); | ||
return addRequestBody(new HttpGetWithEntity(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); | ||
case HttpHead.METHOD_NAME: | ||
return addRequestBody(new HttpHead(uri), entity, compressionEnabled); | ||
return addRequestBody(new HttpHead(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); | ||
case HttpOptions.METHOD_NAME: | ||
return addRequestBody(new HttpOptions(uri), entity, compressionEnabled); | ||
return addRequestBody(new HttpOptions(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); | ||
case HttpPatch.METHOD_NAME: | ||
return addRequestBody(new HttpPatch(uri), entity, compressionEnabled); | ||
return addRequestBody(new HttpPatch(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); | ||
case HttpPost.METHOD_NAME: | ||
HttpPost httpPost = new HttpPost(uri); | ||
addRequestBody(httpPost, entity, compressionEnabled); | ||
addRequestBody(httpPost, entity, compressionEnabled, chunkedTransferEncodingEnabled); | ||
return httpPost; | ||
case HttpPut.METHOD_NAME: | ||
return addRequestBody(new HttpPut(uri), entity, compressionEnabled); | ||
return addRequestBody(new HttpPut(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); | ||
case HttpTrace.METHOD_NAME: | ||
return addRequestBody(new HttpTrace(uri), entity, compressionEnabled); | ||
return addRequestBody(new HttpTrace(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); | ||
default: | ||
throw new UnsupportedOperationException("http method not supported: " + method); | ||
} | ||
} | ||
|
||
private static HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity, boolean compressionEnabled) { | ||
private static HttpRequestBase addRequestBody( | ||
HttpRequestBase httpRequest, | ||
HttpEntity entity, | ||
boolean compressionEnabled, | ||
boolean chunkedTransferEncodingEnabled | ||
) { | ||
if (entity != null) { | ||
if (httpRequest instanceof HttpEntityEnclosingRequestBase) { | ||
if (compressionEnabled) { | ||
entity = new ContentCompressingEntity(entity); | ||
if (chunkedTransferEncodingEnabled) { | ||
entity = new ContentCompressingChunkedEntity(entity); | ||
} else { | ||
entity = new ContentCompressingEntity(entity); | ||
} | ||
} else if (chunkedTransferEncodingEnabled) { | ||
entity = new ChunkedHttpEntity(entity); | ||
} | ||
((HttpEntityEnclosingRequestBase) httpRequest).setEntity(entity); | ||
} else { | ||
|
@@ -782,7 +816,13 @@ private class InternalRequest { | |
String ignoreString = params.remove("ignore"); | ||
this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod()); | ||
URI uri = buildUri(pathPrefix, request.getEndpoint(), params); | ||
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity(), compressionEnabled); | ||
this.httpRequest = createHttpRequest( | ||
request.getMethod(), | ||
uri, | ||
request.getEntity(), | ||
compressionEnabled, | ||
chunkedTransferEncodingEnabled | ||
); | ||
this.cancellable = Cancellable.fromRequest(httpRequest); | ||
setHeaders(httpRequest, request.getOptions().getHeaders()); | ||
setRequestConfig(httpRequest, request.getOptions().getRequestConfig()); | ||
|
@@ -932,6 +972,30 @@ private static Exception extractAndWrapCause(Exception exception) { | |
return new RuntimeException("error while performing request", exception); | ||
} | ||
|
||
/** | ||
* A gzip compressing entity that also implements {@code getContent()}. | ||
*/ | ||
public static class ContentCompressingChunkedEntity extends GzipCompressingEntity { | ||
|
||
/** | ||
* Creates a {@link ContentCompressingChunkedEntity} instance with the provided HTTP entity. | ||
* | ||
* @param entity the HTTP entity. | ||
*/ | ||
public ContentCompressingChunkedEntity(HttpEntity entity) { | ||
super(entity); | ||
} | ||
|
||
@Override | ||
public InputStream getContent() throws IOException { | ||
ByteArrayInputOutputStream out = new ByteArrayInputOutputStream(1024); | ||
try (GZIPOutputStream gzipOut = new GZIPOutputStream(out)) { | ||
wrappedEntity.writeTo(gzipOut); | ||
} | ||
return out.asInput(); | ||
} | ||
} | ||
|
||
/** | ||
* A gzip compressing entity that also implements {@code getContent()}. | ||
*/ | ||
|
@@ -954,6 +1018,56 @@ public InputStream getContent() throws IOException { | |
} | ||
return out.asInput(); | ||
} | ||
|
||
/** | ||
* A gzip compressing entity doesn't work with chunked encoding with sigv4 | ||
* | ||
* @return false | ||
*/ | ||
@Override | ||
public boolean isChunked() { | ||
return false; | ||
} | ||
|
||
/** | ||
* A gzip entity requires content length in http headers | ||
* as it doesn't work with chunked encoding for sigv4 | ||
* | ||
* @return content length of gzip entity | ||
*/ | ||
@Override | ||
public long getContentLength() { | ||
long size; | ||
try (InputStream is = getContent()) { | ||
size = is.readAllBytes().length; | ||
} catch (IOException ex) { | ||
size = -1L; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there other places where we return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes that why I think @reta said to return -1. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's fine. The interface would have been "unsigned long" if it was meant to be otherwise. |
||
} | ||
|
||
return size; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way to get the size of the compressed entity directly without reading all the bytes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. didn't see any method that can gives us that info directly |
||
} | ||
|
||
public static class ChunkedHttpEntity extends HttpEntityWrapper { | ||
/** | ||
* Creates a {@link ChunkedHttpEntity} instance with the provided HTTP entity. | ||
* | ||
* @param entity the HTTP entity. | ||
*/ | ||
public ChunkedHttpEntity(HttpEntity entity) { | ||
super(entity); | ||
} | ||
|
||
/** | ||
* A chunked entity requires transfer-encoding:chunked in http headers | ||
* which requires isChunked to be true | ||
* | ||
* @return true | ||
*/ | ||
@Override | ||
public boolean isChunked() { | ||
return true; | ||
} | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer always the case. A compressing entity can be both chunked or not chunked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes but we require that flag to tell it's a chunked or not and set correct headers