Skip to content

Commit

Permalink
Provide a way to fluently convert an response with WebClient and ad…
Browse files Browse the repository at this point in the history
…d `BlockingWebClient` (#4021)

Motivation:

- Improved usability
  - REST API uses a unary call(non-streaming request and response) to send
    and respond to a message.
    As `WebClient` supports streaming requests and responses by default,
    users need to manually aggregate the returned `HttpResponse`.
  - Testing code needs to wait for the result with `.join()` or `.get()`
    to assert the result.
    If we provide `BlockingWebClient`, users and dev can remove a lot of
    boilerplate code.
- Possibility of performance optimization
  - If a request and response type are known before sending a call, we
    can optimize the performance for non-streaming calls.
  - `ExchangeType` introduced in #3956 will be also used at the client side. 

Modifications:

- Add `ResponseAs<T, U>` that defines a way to convert a response into another.
  - Decoders for `byte[]`, `String`, JSON, `File` are provided by default.
- Add `TransformingRequestPreparation<T, R>` that fluently transforms a returns response.
- Add `FutureTransformingRequestPreparation` that is a specialized converter for `CompletableFuture`.
  - Scala's `Future` will be supported via `ScalaResponseAs`.
- Add `BlockingWebClient` that returns `AggregatedHttpResponse` by default.
- Add `InvalidHttpResponseException` for notifying a failure of JSON decoding.
- Introduce `{Request,Response}Entity<T>` for express a decoded type of an HTTP content.
- Migrate some tests code for proof of concept.

Result:

- You can now fluently convert a response using `WebClient`.
  ```java
  WebClient client = WebClient.of("api.example.com");
  CompletableFuture<ResponseEntity<MyObject>> response =
     client.prepare()
          .get("/v1/items/1")
          .asJson(MyObject.class)
          .execute();
  ```
- You can now use `BlockingWebClient` to wait for a response to be completed.
  ```java
  BlockingWebClient client = BlockingWebClient.of("https://api.example.com");
  ResponseEntity<MyObject> response =
      client.prepare()
            .get("/v1/items/1")
            .asJson(MyObject.class)
            .execute();
  ```
  • Loading branch information
ikhoon authored Jan 27, 2022
1 parent 39feaf4 commit ccf630c
Show file tree
Hide file tree
Showing 76 changed files with 4,671 additions and 781 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected void closeClient(WebClient client) {

@Override
protected void get(WebClient client, String pathIncludingQuery) {
client.get(pathIncludingQuery).aggregate().join();
client.blocking().get(pathIncludingQuery);
}

@Override
Expand All @@ -172,12 +172,12 @@ protected void get(WebClient client, String path, BiConsumer<Integer, Throwable>

@Override
protected void post(WebClient client, String pathIncludingQuery, String body) {
client.post(pathIncludingQuery, body).aggregate().join();
client.blocking().post(pathIncludingQuery, body);
}

@Override
protected void options(WebClient client, String path) {
client.options(path).aggregate().join();
client.blocking().options(path);
}

static ServiceRequestContext serverContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import com.linecorp.armeria.client.BlockingWebClient;
import com.linecorp.armeria.client.ClientFactory;
import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.client.InvalidResponseHeadersException;
Expand Down Expand Up @@ -263,29 +264,31 @@ private static Tracing newTracing(String name) {
void testTimingAnnotations() {
// Use separate client factory to make sure connection is created.
try (ClientFactory clientFactory = ClientFactory.builder().build()) {
final WebClient client = WebClient.builder(server.httpUri())
.factory(clientFactory)
.decorator(BraveClient.newDecorator(newTracing("timed-client")))
.build();
assertThat(client.get("/http").aggregate().join().status()).isEqualTo(HttpStatus.OK);
final BlockingWebClient client =
WebClient.builder(server.httpUri())
.factory(clientFactory)
.decorator(BraveClient.newDecorator(newTracing("timed-client")))
.build()
.blocking();
assertThat(client.get("/http").status()).isEqualTo(HttpStatus.OK);
final MutableSpan[] initialConnectSpans = spanHandler.take(1);
assertThat(initialConnectSpans[0].annotations())
.extracting(Map.Entry::getValue).containsExactlyInAnyOrder(
"connection-acquire.start",
"socket-connect.start",
"socket-connect.end",
"connection-acquire.end",
"ws",
"wr");
"connection-acquire.start",
"socket-connect.start",
"socket-connect.end",
"connection-acquire.end",
"ws",
"wr");

// Make another request which will reuse the connection so no connection timing.
assertThat(client.get("/http").aggregate().join().status()).isEqualTo(HttpStatus.OK);
assertThat(client.get("/http").status()).isEqualTo(HttpStatus.OK);

final MutableSpan[] secondConnectSpans = spanHandler.take(1);
assertThat(secondConnectSpans[0].annotations())
.extracting(Map.Entry::getValue).containsExactlyInAnyOrder(
"ws",
"wr");
"ws",
"wr");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.junit.Rule;
import org.junit.Test;

import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.client.BlockingWebClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpRequest;
Expand Down Expand Up @@ -90,8 +90,8 @@ protected void configure(ServerBuilder sb) throws Exception {

@Test
public void serve1() throws Exception {
final WebClient client = WebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response = client.get("/http-serve").aggregate().get();
final BlockingWebClient client = BlockingWebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response = client.get("/http-serve");
assertThat(response.status()).isEqualTo(HttpStatus.OK);

assertThat(response.headers().contains(HttpHeaderNames.RETRY_AFTER)).isFalse();
Expand All @@ -108,8 +108,8 @@ public void serve1() throws Exception {

@Test
public void throttle1() throws Exception {
final WebClient client = WebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle1").aggregate().get();
final BlockingWebClient client = BlockingWebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle1");
assertThat(response1.status()).isEqualTo(HttpStatus.OK);

assertThat(response1.headers().contains(HttpHeaderNames.RETRY_AFTER)).isFalse();
Expand All @@ -121,7 +121,7 @@ public void throttle1() throws Exception {
assertThat(reset1).isBetween(0L, 10L);
assertThat(response1.headers().contains("X-RateLimit-Limit")).isFalse();

final AggregatedHttpResponse response2 = client.get("/http-throttle1").aggregate().get();
final AggregatedHttpResponse response2 = client.get("/http-throttle1");
assertThat(response2.status()).isEqualTo(HttpStatus.TOO_MANY_REQUESTS);

assertThat(response2.headers().contains(HttpHeaderNames.RETRY_AFTER)).isTrue();
Expand All @@ -138,8 +138,8 @@ public void throttle1() throws Exception {

@Test
public void throttle2() throws Exception {
final WebClient client = WebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle2").aggregate().get();
final BlockingWebClient client = BlockingWebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle2");
assertThat(response1.status()).isEqualTo(HttpStatus.OK);

assertThat(response1.headers().contains(HttpHeaderNames.RETRY_AFTER)).isFalse();
Expand All @@ -151,7 +151,7 @@ public void throttle2() throws Exception {
assertThat(reset1).isBetween(0L, 10L);
assertThat(response1.headers().get("X-RateLimit-Limit")).isEqualTo("1, 1;window=10");

final AggregatedHttpResponse response2 = client.get("/http-throttle2").aggregate().get();
final AggregatedHttpResponse response2 = client.get("/http-throttle2");
assertThat(response2.status()).isEqualTo(HttpStatus.TOO_MANY_REQUESTS);

assertThat(response2.headers().contains(HttpHeaderNames.RETRY_AFTER, "15")).isTrue();
Expand All @@ -164,8 +164,8 @@ public void throttle2() throws Exception {

@Test
public void throttle3() throws Exception {
final WebClient client = WebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle3").aggregate().get();
final BlockingWebClient client = BlockingWebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle3");
assertThat(response1.status()).isEqualTo(HttpStatus.OK);

assertThat(response1.headers().contains(HttpHeaderNames.RETRY_AFTER)).isFalse();
Expand All @@ -174,7 +174,7 @@ public void throttle3() throws Exception {
assertThat(response1.headers().contains("X-RateLimit-Remaining")).isFalse();
assertThat(response1.headers().contains("X-RateLimit-Reset")).isFalse();

final AggregatedHttpResponse response2 = client.get("/http-throttle3").aggregate().get();
final AggregatedHttpResponse response2 = client.get("/http-throttle3");
assertThat(response2.status()).isEqualTo(HttpStatus.TOO_MANY_REQUESTS);

assertThat(response2.headers().contains(HttpHeaderNames.RETRY_AFTER)).isTrue();
Expand All @@ -188,8 +188,8 @@ public void throttle3() throws Exception {

@Test
public void throttle4() throws Exception {
final WebClient client = WebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle4").aggregate().get();
final BlockingWebClient client = BlockingWebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle4");
assertThat(response1.status()).isEqualTo(HttpStatus.OK);

assertThat(response1.headers().contains(HttpHeaderNames.RETRY_AFTER)).isFalse();
Expand All @@ -198,7 +198,7 @@ public void throttle4() throws Exception {
assertThat(response1.headers().contains("X-RateLimit-Remaining")).isFalse();
assertThat(response1.headers().contains("X-RateLimit-Reset")).isFalse();

final AggregatedHttpResponse response2 = client.get("/http-throttle4").aggregate().get();
final AggregatedHttpResponse response2 = client.get("/http-throttle4");
assertThat(response2.status()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE);

assertThat(response2.headers().contains(HttpHeaderNames.RETRY_AFTER)).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,23 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.CollectionType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.HttpEntity;
import com.linecorp.armeria.common.QueryParams;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.internal.common.PercentEncoder;

/**
Expand All @@ -46,19 +43,16 @@
*/
final class CatalogClient {

private static final CollectionType collectionTypeForNode =
TypeFactory.defaultInstance().constructCollectionType(List.class, Node.class);
private static final TypeReference<List<Node>> collectionTypeForNode = new TypeReference<List<Node>>() {};

static CatalogClient of(ConsulClient consulClient) {
return new CatalogClient(consulClient);
}

private final WebClient client;
private final ObjectMapper mapper;

private CatalogClient(ConsulClient client) {
this.client = client.consulWebClient();
mapper = client.getObjectMapper();
}

/**
Expand Down Expand Up @@ -87,15 +81,11 @@ CompletableFuture<List<Node>> service(String serviceName, @Nullable String datac
if (!params.isEmpty()) {
path.append('?').append(params.toQueryString());
}
return client.get(path.toString())
.aggregate()
.thenApply(response -> {
try {
return mapper.readValue(response.content().array(), collectionTypeForNode);
} catch (IOException e) {
return Exceptions.throwUnsafely(e);
}
});
return client.prepare()
.get(path.toString())
.asJson(collectionTypeForNode)
.as(HttpEntity::content)
.execute();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ class ConsulClientBuilderTest extends ConsulTestBase {
@Test
void gets403WhenNoToken() throws Exception {
final HttpStatus status = WebClient.of("http://localhost:" + consul().getHttpPort())
.get("/v1/agent/self").aggregate()
.get().status();
.blocking()
.get("/v1/agent/self")
.status();
assertThat(status).isEqualTo(HttpStatus.FORBIDDEN);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.AfterAll;
Expand All @@ -32,8 +31,8 @@

import com.fasterxml.jackson.core.JsonProcessingException;

import com.linecorp.armeria.client.BlockingWebClient;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.internal.consul.ConsulTestBase;
Expand Down Expand Up @@ -119,67 +118,64 @@ void testThatDefaultCheckMethodIsHead() {
serverRef.set(server);
}).doesNotThrowAnyException();
});
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(
client().healthyEndpoints("testThatDefaultCheckMethodIsHead").join().size()
).isEqualTo(1));
await().untilAsserted(() -> {
assertThat(client().healthyEndpoints("testThatDefaultCheckMethodIsHead").join().size())
.isEqualTo(1);
});
serverRef.get().stop();
}

@Test
void testEndpointsCountOfListeningServiceWithAServerStopAndStart() {
// Checks sample endpoints created when initialized.
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(client().endpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints));
await().untilAsserted(() -> {
assertThat(client().endpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints);
});

// When we close one server then the listener deregister it automatically from consul agent.
servers.get(0).stop().join();

await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> {
final List<Endpoint> results = client().endpoints(serviceName).join();
assertThat(results).hasSize(sampleEndpoints.size() - 1);
});
await().untilAsserted(() -> {
final List<Endpoint> results = client().endpoints(serviceName).join();
assertThat(results).hasSize(sampleEndpoints.size() - 1);
});

// Endpoints increased after service restart.
servers.get(0).start().join();

await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(client().endpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints));
await().untilAsserted(() -> {
assertThat(client().endpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints);
});
}

@Test
void testHealthyServiceWithAdditionalCheckRule() {
// Checks sample endpoints created when initialized.
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(client().healthyEndpoints(serviceName).join())
.hasSameSizeAs(sampleEndpoints));
await().untilAsserted(() -> {
assertThat(client().healthyEndpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints);
});

// Make a service to produce 503 error for checking by consul.
final Endpoint firstEndpoint = sampleEndpoints.get(0);
final WebClient webClient = WebClient.of(firstEndpoint.toUri(SessionProtocol.HTTP));
webClient.post("echo", "503").aggregate().join();
final BlockingWebClient webClient = BlockingWebClient.of(firstEndpoint.toUri(SessionProtocol.HTTP));
webClient.post("echo", "503");

// And then, consul marks the service to an unhealthy state.
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(client().healthyEndpoints(serviceName).join())
.hasSize(sampleEndpoints.size() - 1));
await().untilAsserted(() -> {
assertThat(client().healthyEndpoints(serviceName).join())
.hasSize(sampleEndpoints.size() - 1);
});

// But, the size of endpoints does not changed.
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(client().endpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints));
await().untilAsserted(() -> {
assertThat(client().endpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints);
});

// Make a service to produce 200 OK for checking by consul.
webClient.post("echo", "200").aggregate().join();
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(client().healthyEndpoints(serviceName).join())
.hasSameSizeAs(sampleEndpoints));
webClient.post("echo", "200");
await().untilAsserted(() -> {
assertThat(client().healthyEndpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints);
});
}

@Test
Expand All @@ -198,13 +194,11 @@ void testThatTagsAreAdded() {
.build();
server.addListener(listener);
server.start().join();
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(
client().healthyEndpoints("testThatTagsAreAdded", null,
"Service.Tags contains \"v1\"")
.join()
.size()
).isEqualTo(1));
await().untilAsserted(() -> {
assertThat(client().healthyEndpoints("testThatTagsAreAdded", null,
"Service.Tags contains \"v1\"").join())
.hasSize(1);
});
server.stop();
}
}
Loading

0 comments on commit ccf630c

Please sign in to comment.