Skip to content

Commit

Permalink
Add delegating response converter function provider (#4336)
Browse files Browse the repository at this point in the history
Potentially resolves issue [4301](#4301 (comment))


Motivation:

Fix bug with Protobuf response converter. The fix now allows us to use a custom converter, while still preserving desired behaviour in other areas.

Please see the discussion issue 4301 for more information.

Modifications:

- Take functionality from `AnnotatedService` into its own `ResponseConverterFunctionSelector` class. This isolates the behaviour of selecting response converters and lets us test its functionality in isolation.
- Tests regarding desired order of converter usage
- Test to ensure the bug with the Protobuf response converter is fixed
- Split existing `ResponseConverterFunctionProvider` functionality into two separate interfaces --- `ResponseConverterFunctionProvider` and `DelegatingResponseConverterFunctionProvider`. These interfaces have different expectations in terms of parameters and usage of passed-in parameters
- Several later commits involve resolving merge conflicts which were introduced because there were changes in the master branch to the code which we had pulled out into `ResponseConverterFunctionSelector` in this branch

Result:

- Closes #4301.
- You will now be able to use a custom JsonFormat.Printer for ProtobufResponseConverterFunction 
- You will have to update any custom SPI implementations to conform to either `ResponseConverterFunctionProvider` or `DelegatingResponseConverterFunctionProvider`
  • Loading branch information
resquivel-squareup authored Sep 5, 2022
1 parent 89e3b6e commit 70d410d
Show file tree
Hide file tree
Showing 26 changed files with 580 additions and 155 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2022 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.internal.server.annotation;

import static com.linecorp.armeria.internal.common.util.ObjectCollectingUtil.collectFrom;
import static com.linecorp.armeria.internal.server.annotation.ClassUtil.typeToClass;
import static com.linecorp.armeria.internal.server.annotation.ClassUtil.unwrapUnaryAsyncType;

import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import org.reactivestreams.Publisher;

import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.ResponseConverterFunction;

/**
* A response converter implementation which creates an {@link HttpResponse} with
* the objects published from a {@link Publisher} or {@link Stream}.
*/
final class AggregatedResponseConverterFunction implements ResponseConverterFunction {

private final ResponseConverterFunction responseConverter;

AggregatedResponseConverterFunction(ResponseConverterFunction responseConverter) {
this.responseConverter = responseConverter;
}

@Override
public Boolean isResponseStreaming(Type returnType, @Nullable MediaType contentType) {
final Class<?> clazz = typeToClass(unwrapUnaryAsyncType(returnType));
if (clazz == null) {
return null;
}

if (HttpResponse.class.isAssignableFrom(clazz)) {
return true;
}
if (Publisher.class.isAssignableFrom(clazz) || Stream.class.isAssignableFrom(clazz)) {
return false;
}

return null;
}

@Override
@SuppressWarnings("unchecked")
public HttpResponse convertResponse(ServiceRequestContext ctx, ResponseHeaders headers,
@Nullable Object result, HttpHeaders trailers) throws Exception {
final CompletableFuture<?> f;
if (result instanceof Publisher) {
f = collectFrom((Publisher<Object>) result, ctx);
} else if (result instanceof Stream) {
f = collectFrom((Stream<Object>) result, ctx.blockingTaskExecutor());
} else {
return ResponseConverterFunction.fallthrough();
}

return HttpResponse.from(f.thenApply(aggregated -> {
try {
return responseConverter.convertResponse(ctx, headers, aggregated, trailers);
} catch (Exception ex) {
return Exceptions.throwUnsafely(ex);
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package com.linecorp.armeria.internal.server.annotation;

import static com.google.common.base.Preconditions.checkArgument;
import static com.linecorp.armeria.internal.common.util.ObjectCollectingUtil.collectFrom;
import static com.linecorp.armeria.internal.server.annotation.ClassUtil.typeToClass;
import static com.linecorp.armeria.internal.server.annotation.ClassUtil.unwrapUnaryAsyncType;
import static com.linecorp.armeria.internal.server.annotation.ResponseConverterFunctionUtil.newResponseConverter;
import static java.util.Objects.requireNonNull;

import java.lang.invoke.MethodHandle;
Expand All @@ -29,19 +27,15 @@
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Stream;

import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.common.AggregatedHttpResponse;
Expand All @@ -68,18 +62,13 @@
import com.linecorp.armeria.server.RoutingContext;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.SimpleDecoratingHttpService;
import com.linecorp.armeria.server.annotation.ByteArrayResponseConverterFunction;
import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction;
import com.linecorp.armeria.server.annotation.ExceptionVerbosity;
import com.linecorp.armeria.server.annotation.FallthroughException;
import com.linecorp.armeria.server.annotation.HttpFileResponseConverterFunction;
import com.linecorp.armeria.server.annotation.HttpResult;
import com.linecorp.armeria.server.annotation.JacksonResponseConverterFunction;
import com.linecorp.armeria.server.annotation.Path;
import com.linecorp.armeria.server.annotation.ResponseConverterFunction;
import com.linecorp.armeria.server.annotation.ResponseConverterFunctionProvider;
import com.linecorp.armeria.server.annotation.ServiceName;
import com.linecorp.armeria.server.annotation.StringResponseConverterFunction;

/**
* An {@link HttpService} which is defined by a {@link Path} or HTTP method annotations.
Expand All @@ -94,31 +83,11 @@ public final class AnnotatedService implements HttpService {
*/
private static final String CGLIB_CLASS_SEPARATOR = "$$";

/**
* A default {@link ResponseConverterFunction}s.
*/
private static final List<ResponseConverterFunction> defaultResponseConverters =
ImmutableList.of(new JacksonResponseConverterFunction(),
new StringResponseConverterFunction(),
new ByteArrayResponseConverterFunction(),
new HttpFileResponseConverterFunction());

private static final MethodHandles.Lookup lookup = MethodHandles.lookup();

private static final CompletableFuture<AggregatedResult>
NO_AGGREGATION_FUTURE = UnmodifiableFuture.completedFuture(AggregatedResult.EMPTY);

static final List<ResponseConverterFunctionProvider> responseConverterFunctionProviders =
ImmutableList.copyOf(ServiceLoader.load(ResponseConverterFunctionProvider.class,
AnnotatedService.class.getClassLoader()));

static {
if (!responseConverterFunctionProviders.isEmpty()) {
logger.debug("Available {}s: {}", ResponseConverterFunctionProvider.class.getSimpleName(),
responseConverterFunctionProviders);
}
}

private final Object object;
private final Method method;
private final MethodHandle methodHandle;
Expand Down Expand Up @@ -168,7 +137,7 @@ public final class AnnotatedService implements HttpService {
}

actualReturnType = getActualReturnType(method);
responseConverter = responseConverter(
responseConverter = newResponseConverter(
actualReturnType, requireNonNull(responseConverters, "responseConverters"));
aggregationStrategy = AggregationStrategy.from(resolvers);
this.route = requireNonNull(route, "route");
Expand Down Expand Up @@ -208,36 +177,6 @@ public final class AnnotatedService implements HttpService {
methodHandle = asMethodHandle(method, object);
}

private static ResponseConverterFunction responseConverter(
Type returnType, List<ResponseConverterFunction> responseConverters) {

final ImmutableList<ResponseConverterFunction> backingConverters =
ImmutableList
.<ResponseConverterFunction>builder()
.addAll(responseConverters)
.addAll(defaultResponseConverters)
.build();
final ResponseConverterFunction responseConverter = new CompositeResponseConverterFunction(
ImmutableList
.<ResponseConverterFunction>builder()
.addAll(backingConverters)
// It is the last converter to try to convert the result object into an HttpResponse
// after aggregating the published object from a Publisher or Stream.
.add(new AggregatedResponseConverterFunction(
new CompositeResponseConverterFunction(backingConverters)))
.build());

for (final ResponseConverterFunctionProvider provider : responseConverterFunctionProviders) {
final ResponseConverterFunction func =
provider.createResponseConverterFunction(returnType, responseConverter);
if (func != null) {
return func;
}
}

return responseConverter;
}

private static Type getActualReturnType(Method method) {
final Class<?> returnType;
final Type genericReturnType;
Expand Down Expand Up @@ -597,62 +536,6 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) {
}
}

/**
* A response converter implementation which creates an {@link HttpResponse} with
* the objects published from a {@link Publisher} or {@link Stream}.
*/
@VisibleForTesting
static final class AggregatedResponseConverterFunction implements ResponseConverterFunction {

private final ResponseConverterFunction responseConverter;

AggregatedResponseConverterFunction(ResponseConverterFunction responseConverter) {
this.responseConverter = responseConverter;
}

@Override
public Boolean isResponseStreaming(Type returnType, @Nullable MediaType contentType) {
final Class<?> clazz = typeToClass(unwrapUnaryAsyncType(returnType));
if (clazz == null) {
return null;
}

if (HttpResponse.class.isAssignableFrom(clazz)) {
return true;
}
if (Publisher.class.isAssignableFrom(clazz) || Stream.class.isAssignableFrom(clazz)) {
return false;
}

return null;
}

@Override
@SuppressWarnings("unchecked")
public HttpResponse convertResponse(ServiceRequestContext ctx,
ResponseHeaders headers,
@Nullable Object result,
HttpHeaders trailers) throws Exception {
final CompletableFuture<?> f;
if (result instanceof Publisher) {
f = collectFrom((Publisher<Object>) result, ctx);
} else if (result instanceof Stream) {
f = collectFrom((Stream<Object>) result, ctx.blockingTaskExecutor());
} else {
return ResponseConverterFunction.fallthrough();
}

assert f != null;
return HttpResponse.from(f.thenApply(aggregated -> {
try {
return responseConverter.convertResponse(ctx, headers, aggregated, trailers);
} catch (Exception ex) {
return Exceptions.throwUnsafely(ex);
}
}));
}
}

/**
* Returns the user-defined class for the given class: usually simply the given class,
* but the original class in case of a CGLIB-generated subclass.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2022 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.internal.server.annotation;

import static com.google.common.collect.ImmutableList.toImmutableList;

import java.lang.reflect.Type;
import java.util.List;
import java.util.Objects;
import java.util.ServiceLoader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.server.annotation.ByteArrayResponseConverterFunction;
import com.linecorp.armeria.server.annotation.DelegatingResponseConverterFunctionProvider;
import com.linecorp.armeria.server.annotation.HttpFileResponseConverterFunction;
import com.linecorp.armeria.server.annotation.JacksonResponseConverterFunction;
import com.linecorp.armeria.server.annotation.ResponseConverterFunction;
import com.linecorp.armeria.server.annotation.ResponseConverterFunctionProvider;
import com.linecorp.armeria.server.annotation.StringResponseConverterFunction;

final class ResponseConverterFunctionUtil {

private static final Logger logger = LoggerFactory.getLogger(ResponseConverterFunctionUtil.class);

/**
* The default {@link ResponseConverterFunction}s.
*/
private static final List<ResponseConverterFunction> defaultResponseConverters = ImmutableList.of(
new JacksonResponseConverterFunction(), new StringResponseConverterFunction(),
new ByteArrayResponseConverterFunction(), new HttpFileResponseConverterFunction());

private static final List<ResponseConverterFunctionProvider> responseConverterProviders =
ImmutableList.copyOf(ServiceLoader.load(ResponseConverterFunctionProvider.class,
ResponseConverterFunctionUtil.class.getClassLoader()));

private static final List<DelegatingResponseConverterFunctionProvider>
delegatingResponseConverterProviders = ImmutableList.copyOf(
ServiceLoader.load(DelegatingResponseConverterFunctionProvider.class,
ResponseConverterFunctionUtil.class.getClassLoader()));

static {
if (!responseConverterProviders.isEmpty()) {
logger.debug("Available {}s: {}", ResponseConverterFunctionProvider.class.getSimpleName(),
responseConverterProviders);
}
if (!delegatingResponseConverterProviders.isEmpty()) {
logger.debug("Available {}s: {}", DelegatingResponseConverterFunctionProvider.class.getSimpleName(),
delegatingResponseConverterProviders);
}
}

static ResponseConverterFunction newResponseConverter(Type returnType,
List<ResponseConverterFunction> responseConverters) {
final List<ResponseConverterFunction> nonDelegatingSpiConverters =
responseConverterProviders.stream()
.map(provider -> provider.newResponseConverterFunction(returnType))
.filter(Objects::nonNull)
.collect(toImmutableList());

final ImmutableList<ResponseConverterFunction> backingConverters =
ImmutableList.<ResponseConverterFunction>builder()
.addAll(responseConverters)
.addAll(nonDelegatingSpiConverters)
.addAll(defaultResponseConverters)
.build();

final ResponseConverterFunction responseConverter = new CompositeResponseConverterFunction(
ImmutableList.<ResponseConverterFunction>builder()
.addAll(backingConverters)
// It is the last converter to try to convert the result object into an
// HttpResponse after aggregating the published object from a Publisher or Stream.
.add(new AggregatedResponseConverterFunction(
new CompositeResponseConverterFunction(backingConverters))).build());

for (final DelegatingResponseConverterFunctionProvider provider
: delegatingResponseConverterProviders) {
final ResponseConverterFunction func =
provider.createResponseConverterFunction(returnType, responseConverter);
if (func != null) {
return func;
}
}

return responseConverter;
}

private ResponseConverterFunctionUtil() {}
}
Loading

0 comments on commit 70d410d

Please sign in to comment.