Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
feat: introduce HttpJsonClientCall, Listeners infrastructure and Serv…
Browse files Browse the repository at this point in the history
…erStreaming support in REST transport (#1599)

This includes the following changes for `HTTP1.1/REST` transport:
1) `HttpJsonClientCall` class (with `HttpJsonClientCall.Listener`) mimicking [io.grpc.ClientCall](https://github.com/grpc/grpc-java/blob/master/api/src/main/java/io/grpc/ClientCall.java#L102) functionality. Most of the complexity of this PR is concentrated in `HttpJsonClientCallImpl` class.
2) The unary callables are rewritten to be based on `HttpJsonClientCall` flow (similarly to how it is already done in gRPC unary calls).
3) Server streaming support for REST transport. The implementation is based on `HttpJsonClientCall` and `HttpJsonClientCall.Listener` (introduced in this PR), similarly to how gRPC streaming is based on `io.grpc.ClientCall` and `io.grpc.ClientCall.Listener` (implemented in [grpc-java](https://github.com/grpc/grpc-java/) library) respectively.

The extreme similarity between `HttpJsonClientCall` call and `io.grpc.ClientCall` is intentional and crucial for consistency of the two transports and also intends simplifying creation and maintenance of multi-transport manual wrappers (like [google-ads-java](https://github.com/googleads/google-ads-java)).

The server streaming abstractions in gax java are all based on the flow control managed by a ClientCall, so having similar set of abstractions in REST transport is necessary to reuse transport-independent portions of streaming logic in gax and maintain identical user-facing streaming surface.

This PR also builds a foundation for the soon-coming [ClientInterceptor](https://github.com/grpc/grpc-java/blob/master/api/src/main/java/io/grpc/ClientInterceptor.java#L42)-like infrastructure in REST transport. This is specifically required to support REST transport in [google-ads-java](https://github.com/googleads/google-ads-java/blob/main/google-ads/src/main/java/com/google/ads/googleads/lib/logging/LoggingInterceptor.java#L42).

REST-based client-side streaming and bidirectional streaming is not implemented by this PR and most likely will never be due to limitations of the `HTTP1.1/REST` protocol compared to `HTTP2/gRPC`.

Most of the java docs in `HttpJsonClientCall` class is a modified version of the java docs from `io.grpc.ClientCall`, which is intentional, because `HttpJsonClientCall` is designed to be as similar to `io.grpc.ClientCall` in both surface and behavior as possible (while the two classes cannot be a part of the same class hierarchy, because they belong to two independent transport layers).

**What server-streaming means in case of REST transport** 
In REST transport server-streaming methods return a JSON array of response messages (i.e. the array element type is the same one used as a returned type in the corresponding method definition in protobuf). The response is provided as as [Chunck-encoded](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) input stream, containing one big JSON array. To parse the json array we rely on [JsonReader](https://github.com/google/gson/blob/master/gson/src/main/java/com/google/gson/stream/JsonReader.java#L191) from gson  library, which gax-httpjson already depended on even prior this PR (check `ProtoMessageJsonStreamIterator` class implementation in this PR for details). Note, we must process elements of the array one-by-one because the size of the full array may be in realm of gigabytes.

_**Note**, ideally I need to split this PR at least in two separate ones: 1) HttpJsonClientCall stuff and unary calls based on it in one PR and then 2) server streaming feature in a second PR. Unfortunately the most reasonable way to test `HttpJsonClientCall` infrastructure is by doing it from server streaming logic beause most of the complexity introduced in HttpJsonClient call is induced by necessity to support streaming workflow in the first place (and to support call interceptors (not part of this PR) as a secondary goal)._

_**Note**, there are a few minor breaking changes in gax-httpjson module (and only there) inroduced in this PR. This should be ok, because unlike gax and gax-grpc, gax-httpjson is not GA yet. The breaking changes are very minor (in the space of `HttpJsonCallContext` and `ManagedHttpJsonChannel`) and are backward-compatible with `java-compute` (the main and only officially supported user of gax-httpjson as of now)._
  • Loading branch information
vam-google authored Jan 21, 2022
1 parent 5081ec6 commit 3c97529
Show file tree
Hide file tree
Showing 36 changed files with 2,889 additions and 622 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void testServerStreaming() throws Exception {

streamingCallable.call(DEFAULT_REQUEST, moneyObserver);

latch.await(20, TimeUnit.SECONDS);
Truth.assertThat(latch.await(20, TimeUnit.SECONDS)).isTrue();
Truth.assertThat(moneyObserver.error).isNull();
Truth.assertThat(moneyObserver.response).isEqualTo(DEFAULT_RESPONSE);
}
Expand All @@ -157,13 +157,13 @@ public void testManualFlowControl() throws Exception {

streamingCallable.call(DEFAULT_REQUEST, moneyObserver);

latch.await(500, TimeUnit.MILLISECONDS);
Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isFalse();
Truth.assertWithMessage("Received response before requesting it")
.that(moneyObserver.response)
.isNull();

moneyObserver.controller.request(1);
latch.await(500, TimeUnit.MILLISECONDS);
Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isTrue();

Truth.assertThat(moneyObserver.response).isEqualTo(DEFAULT_RESPONSE);
Truth.assertThat(moneyObserver.completed).isTrue();
Expand All @@ -178,7 +178,7 @@ public void testCancelClientCall() throws Exception {

moneyObserver.controller.cancel();
moneyObserver.controller.request(1);
latch.await(500, TimeUnit.MILLISECONDS);
Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isTrue();

Truth.assertThat(moneyObserver.error).isInstanceOf(CancellationException.class);
Truth.assertThat(moneyObserver.error).hasMessageThat().isEqualTo("User cancelled stream");
Expand All @@ -190,7 +190,7 @@ public void testOnResponseError() throws Throwable {
MoneyObserver moneyObserver = new MoneyObserver(true, latch);

streamingCallable.call(ERROR_REQUEST, moneyObserver);
latch.await(500, TimeUnit.MILLISECONDS);
Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isTrue();

Truth.assertThat(moneyObserver.error).isInstanceOf(ApiException.class);
Truth.assertThat(((ApiException) moneyObserver.error).getStatusCode().getCode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.protobuf.TypeRegistry;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;

Expand Down Expand Up @@ -91,25 +92,28 @@ ApiMessageHttpResponseParser.Builder<ResponseT> newBuilder() {

@Override
public ResponseT parse(InputStream httpResponseBody) {
return parse(httpResponseBody, null);
}

@Override
public ResponseT parse(InputStream httpResponseBody, TypeRegistry registry) {
return parse(new InputStreamReader(httpResponseBody, StandardCharsets.UTF_8), registry);
}

@Override
public ResponseT parse(Reader httpResponseBody, TypeRegistry registry) {
if (getResponseInstance() == null) {
return null;
} else {
Type responseType = getResponseInstance().getClass();
try {
return getResponseMarshaller()
.fromJson(
new InputStreamReader(httpResponseBody, StandardCharsets.UTF_8), responseType);
return getResponseMarshaller().fromJson(httpResponseBody, responseType);
} catch (JsonIOException | JsonSyntaxException e) {
throw new RestSerializationException(e);
}
}
}

@Override
public ResponseT parse(InputStream httpResponseBody, TypeRegistry registry) {
return parse(httpResponseBody);
}

@Override
public String serialize(ResponseT response) {
return getResponseMarshaller().toJson(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,18 @@
@AutoValue
/* Method descriptor for messages to be transmitted over HTTP. */
public abstract class ApiMethodDescriptor<RequestT, ResponseT> {
public enum MethodType {
UNARY,
CLIENT_STREAMING,
SERVER_STREAMING,
BIDI_STREAMING,
UNKNOWN;
}

public abstract String getFullMethodName();

public abstract HttpRequestFormatter<RequestT> getRequestFormatter();

@Nullable
public abstract HttpResponseParser<ResponseT> getResponseParser();

/** Return the HTTP method for this request message type. */
Expand All @@ -55,8 +61,11 @@ public abstract class ApiMethodDescriptor<RequestT, ResponseT> {
@Nullable
public abstract PollingRequestFactory<RequestT> getPollingRequestFactory();

public abstract MethodType getType();

public static <RequestT, ResponseT> Builder<RequestT, ResponseT> newBuilder() {
return new AutoValue_ApiMethodDescriptor.Builder<RequestT, ResponseT>();
return new AutoValue_ApiMethodDescriptor.Builder<RequestT, ResponseT>()
.setType(MethodType.UNARY);
}

@AutoValue.Builder
Expand All @@ -78,6 +87,8 @@ public abstract Builder<RequestT, ResponseT> setOperationSnapshotFactory(
public abstract Builder<RequestT, ResponseT> setPollingRequestFactory(
PollingRequestFactory<RequestT> pollingRequestFactory);

public abstract Builder<RequestT, ResponseT> setType(MethodType type);

public abstract ApiMethodDescriptor<RequestT, ResponseT> build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2022 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.httpjson;

import com.google.api.client.http.HttpResponseException;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
import java.util.concurrent.CancellationException;

class HttpJsonApiExceptionFactory {
private final Set<Code> retryableCodes;

HttpJsonApiExceptionFactory(Set<Code> retryableCodes) {
this.retryableCodes = ImmutableSet.copyOf(retryableCodes);
}

ApiException create(Throwable throwable) {
if (throwable instanceof HttpResponseException) {
HttpResponseException e = (HttpResponseException) throwable;
StatusCode statusCode = HttpJsonStatusCode.of(e.getStatusCode());
boolean canRetry = retryableCodes.contains(statusCode.getCode());
String message = e.getStatusMessage();
return createApiException(throwable, statusCode, message, canRetry);
} else if (throwable instanceof HttpJsonStatusRuntimeException) {
HttpJsonStatusRuntimeException e = (HttpJsonStatusRuntimeException) throwable;
StatusCode statusCode = HttpJsonStatusCode.of(e.getStatusCode());
return createApiException(
throwable, statusCode, e.getMessage(), retryableCodes.contains(statusCode.getCode()));
} else if (throwable instanceof CancellationException) {
return ApiExceptionFactory.createException(
throwable, HttpJsonStatusCode.of(Code.CANCELLED), false);
} else if (throwable instanceof ApiException) {
return (ApiException) throwable;
} else {
// Do not retry on unknown throwable, even when UNKNOWN is in retryableCodes
return ApiExceptionFactory.createException(
throwable, HttpJsonStatusCode.of(StatusCode.Code.UNKNOWN), false);
}
}

private ApiException createApiException(
Throwable throwable, StatusCode statusCode, String message, boolean canRetry) {
return message == null
? ApiExceptionFactory.createException(throwable, statusCode, canRetry)
: ApiExceptionFactory.createException(message, throwable, statusCode, canRetry);
}
}
Loading

0 comments on commit 3c97529

Please sign in to comment.