Skip to content

Commit

Permalink
Change blockingTaskExecutor from Executor to ScheduledExecutorS… (#2269)
Browse files Browse the repository at this point in the history
Motivation:
It will be good if a user can schedule a task using `blockingTaskExecutor`.

Modification:
- Change `blockingTaskExecutor` from `Executor` to `ScheduledExecutorService`.

Result:
- You can now schedule a task using `blockingTaskExecutor`.
- (Breaking) `ServerBuilder.blockingTaskExecutor(Executor executor, ...)` is now taking `ScheduledExecutorService`.
  • Loading branch information
minwoox authored and trustin committed Nov 26, 2019
1 parent 72b1d95 commit 964b7a5
Show file tree
Hide file tree
Showing 20 changed files with 224 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -87,6 +88,11 @@ public final ExecutorService makeContextAware(ExecutorService executor) {
return RequestContext.super.makeContextAware(executor);
}

@Override
public final ScheduledExecutorService makeContextAware(ScheduledExecutorService executor) {
return RequestContext.super.makeContextAware(executor);
}

@Override
public final <T> Callable<T> makeContextAware(Callable<T> callable) {
return () -> {
Expand Down
22 changes: 10 additions & 12 deletions core/src/main/java/com/linecorp/armeria/common/CommonPools.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

package com.linecorp.armeria.common;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.linecorp.armeria.client.ClientFactoryBuilder;
Expand All @@ -33,28 +32,27 @@
*/
public final class CommonPools {

private static final Executor BLOCKING_TASK_EXECUTOR;
private static final ScheduledExecutorService BLOCKING_TASK_EXECUTOR;
private static final EventLoopGroup WORKER_GROUP;

static {
// Threads spawned as needed and reused, with a 60s timeout and unbounded work queue.
final ThreadPoolExecutor blockingTaskExecutor = new ThreadPoolExecutor(
Flags.numCommonBlockingTaskThreads(), Flags.numCommonBlockingTaskThreads(),
60, TimeUnit.SECONDS, new LinkedTransferQueue<>(),
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
Flags.numCommonBlockingTaskThreads(),
ThreadFactories.newThreadFactory("armeria-common-blocking-tasks", true));

blockingTaskExecutor.allowCoreThreadTimeOut(true);
BLOCKING_TASK_EXECUTOR = blockingTaskExecutor;
scheduledThreadPoolExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
scheduledThreadPoolExecutor.allowCoreThreadTimeOut(true);
BLOCKING_TASK_EXECUTOR = scheduledThreadPoolExecutor;

WORKER_GROUP = EventLoopGroups.newEventLoopGroup(Flags.numCommonWorkers(),
"armeria-common-worker", true);
}

/**
* Returns the default common blocking task {@link Executor} which is used for
* Returns the default common blocking task {@link ScheduledExecutorService} which is used for
* potentially long-running tasks which may block I/O threads.
*/
public static Executor blockingTaskExecutor() {
public static ScheduledExecutorService blockingTaskExecutor() {
return BLOCKING_TASK_EXECUTOR;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -387,6 +388,14 @@ default ExecutorService makeContextAware(ExecutorService executor) {
return new RequestContextAwareExecutorService(this, executor);
}

/**
* Returns a {@link ScheduledExecutorService} that will execute callbacks in the given {@code executor},
* making sure to propagate the current {@link RequestContext} into the callback execution.
*/
default ScheduledExecutorService makeContextAware(ScheduledExecutorService executor) {
return new RequestContextAwareScheduledExecutorService(this, executor);
}

/**
* Returns a {@link Callable} that makes sure the current {@link RequestContext} is set and then invokes
* the input {@code callable}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2019 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

final class RequestContextAwareScheduledExecutorService extends RequestContextAwareExecutorService
implements ScheduledExecutorService {

RequestContextAwareScheduledExecutorService(RequestContext context, ScheduledExecutorService delegate) {
super(context, delegate);
}

@Override
ScheduledExecutorService delegate() {
return (ScheduledExecutorService) super.delegate();
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate().schedule(context().makeContextAware(command), delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate().schedule(context().makeContextAware(callable), delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
return delegate().scheduleAtFixedRate(context().makeContextAware(command), initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
return delegate().scheduleWithFixedDelay(context().makeContextAware(command),
initialDelay, delay, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;

Expand Down Expand Up @@ -86,7 +86,7 @@ public class DefaultServiceRequestContext extends NonWrappingRequestContext impl
private final Logger logger;

@Nullable
private ExecutorService blockingTaskExecutor;
private ScheduledExecutorService blockingTaskExecutor;

private long requestTimeoutMillis;
@Nullable
Expand Down Expand Up @@ -309,7 +309,7 @@ public HttpService service() {
}

@Override
public ExecutorService blockingTaskExecutor() {
public ScheduledExecutorService blockingTaskExecutor() {
if (blockingTaskExecutor != null) {
return blockingTaskExecutor;
}
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/java/com/linecorp/armeria/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -534,10 +534,11 @@ private void doStop(CompletableFuture<Void> future,

private void finishDoStop(CompletableFuture<Void> future) {
if (config.shutdownBlockingTaskExecutorOnStop()) {
final ExecutorService executor;
final ExecutorService blockingTaskExecutor = config.blockingTaskExecutor();
if (blockingTaskExecutor instanceof InterminableExecutorService) {
executor = ((InterminableExecutorService) blockingTaskExecutor).getExecutorService();
final ScheduledExecutorService executor;
final ScheduledExecutorService blockingTaskExecutor = config.blockingTaskExecutor();
if (blockingTaskExecutor instanceof UnstoppableScheduledExecutorService) {
executor =
((UnstoppableScheduledExecutorService) blockingTaskExecutor).getExecutorService();
} else {
executor = blockingTaskExecutor;
}
Expand Down
23 changes: 7 additions & 16 deletions core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -177,7 +178,7 @@ public final class ServerBuilder {
private int proxyProtocolMaxTlvSize = PROXY_PROTOCOL_DEFAULT_MAX_TLV_SIZE;
private Duration gracefulShutdownQuietPeriod = DEFAULT_GRACEFUL_SHUTDOWN_QUIET_PERIOD;
private Duration gracefulShutdownTimeout = DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT;
private Executor blockingTaskExecutor = CommonPools.blockingTaskExecutor();
private ScheduledExecutorService blockingTaskExecutor = CommonPools.blockingTaskExecutor();
private boolean shutdownBlockingTaskExecutorOnStop;
private MeterRegistry meterRegistry = Metrics.globalRegistry;
private String serviceLoggerPrefix = DEFAULT_SERVICE_LOGGER_PREFIX;
Expand Down Expand Up @@ -624,24 +625,14 @@ public ServerBuilder gracefulShutdownTimeout(Duration quietPeriod, Duration time
}

/**
* Sets the {@link Executor} dedicated to the execution of blocking tasks or invocations.
* Sets the {@link ScheduledExecutorService} dedicated to the execution of blocking tasks or invocations.
* If not set, {@linkplain CommonPools#blockingTaskExecutor() the common pool} is used.
*
* @deprecated Use {@link #blockingTaskExecutor(Executor, boolean)}.
*
*/
@Deprecated
public ServerBuilder blockingTaskExecutor(Executor blockingTaskExecutor) {
return blockingTaskExecutor(blockingTaskExecutor, false);
}

/**
* Sets the {@link Executor} dedicated to the execution of blocking tasks or invocations.
* If not set, {@linkplain CommonPools#blockingTaskExecutor() the common pool} is used.
*
* @param shutdownOnStop whether to shut down the {@link Executor} when the {@link Server} stops
* @param shutdownOnStop whether to shut down the {@link ScheduledExecutorService} when the
* {@link Server} stops
*/
public ServerBuilder blockingTaskExecutor(Executor blockingTaskExecutor, boolean shutdownOnStop) {
public ServerBuilder blockingTaskExecutor(ScheduledExecutorService blockingTaskExecutor,
boolean shutdownOnStop) {
this.blockingTaskExecutor = requireNonNull(blockingTaskExecutor, "blockingTaskExecutor");
shutdownBlockingTaskExecutorOnStop = shutdownOnStop;
return this;
Expand Down
33 changes: 11 additions & 22 deletions core/src/main/java/com/linecorp/armeria/server/ServerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Predicate;
import java.util.function.Supplier;

Expand All @@ -42,7 +42,6 @@

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import io.micrometer.core.instrument.internal.TimedExecutor;
import io.micrometer.core.instrument.internal.TimedExecutorService;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
Expand Down Expand Up @@ -86,7 +85,7 @@ public final class ServerConfig {
private final Duration gracefulShutdownQuietPeriod;
private final Duration gracefulShutdownTimeout;

private final ExecutorService blockingTaskExecutor;
private final ScheduledExecutorService blockingTaskExecutor;
private final boolean shutdownBlockingTaskExecutorOnStop;

private final MeterRegistry meterRegistry;
Expand Down Expand Up @@ -117,7 +116,7 @@ public final class ServerConfig {
long http2MaxStreamsPerConnection, int http2MaxFrameSize, long http2MaxHeaderListSize,
int http1MaxInitialLineLength, int http1MaxHeaderSize, int http1MaxChunkSize,
Duration gracefulShutdownQuietPeriod, Duration gracefulShutdownTimeout,
Executor blockingTaskExecutor, boolean shutdownBlockingTaskExecutorOnStop,
ScheduledExecutorService blockingTaskExecutor, boolean shutdownBlockingTaskExecutorOnStop,
MeterRegistry meterRegistry, String serviceLoggerPrefix,
int proxyProtocolMaxTlvSize,
Map<ChannelOption<?>, Object> channelOptions,
Expand Down Expand Up @@ -157,21 +156,11 @@ public final class ServerConfig {
gracefulShutdownQuietPeriod, "gracefulShutdownQuietPeriod");

requireNonNull(blockingTaskExecutor, "blockingTaskExecutor");
if (blockingTaskExecutor instanceof ExecutorService) {
ExecutorService taskExecutor = (ExecutorService) blockingTaskExecutor;
if (!(blockingTaskExecutor instanceof TimedExecutorService)) {
taskExecutor = ExecutorServiceMetrics.monitor(meterRegistry, taskExecutor,
"armeriaBlockingTaskExecutor");
}
this.blockingTaskExecutor = new InterminableExecutorService(taskExecutor);
} else {
Executor taskExecutor = blockingTaskExecutor;
if (!(blockingTaskExecutor instanceof TimedExecutor)) {
taskExecutor = ExecutorServiceMetrics.monitor(meterRegistry, taskExecutor,
"armeriaBlockingTaskExecutor");
}
this.blockingTaskExecutor = new ExecutorBasedExecutorService(taskExecutor);
if (!(blockingTaskExecutor instanceof TimedExecutorService)) {
blockingTaskExecutor = ExecutorServiceMetrics.monitor(meterRegistry, blockingTaskExecutor,
"armeriaBlockingTaskExecutor");
}
this.blockingTaskExecutor = UnstoppableScheduledExecutorService.from(blockingTaskExecutor);
this.shutdownBlockingTaskExecutorOnStop = shutdownBlockingTaskExecutorOnStop;

this.meterRegistry = requireNonNull(meterRegistry, "meterRegistry");
Expand Down Expand Up @@ -555,12 +544,12 @@ public Duration gracefulShutdownTimeout() {
}

/**
* Returns the {@link ExecutorService} dedicated to the execution of blocking tasks or invocations.
* Note that the {@link ExecutorService} returned by this method does not set the
* Returns the {@link ScheduledExecutorService} dedicated to the execution of blocking tasks or invocations.
* Note that the {@link ScheduledExecutorService} returned by this method does not set the
* {@link ServiceRequestContext} when executing a submitted task.
* Use {@link ServiceRequestContext#blockingTaskExecutor()} if possible.
*/
public ExecutorService blockingTaskExecutor() {
public ScheduledExecutorService blockingTaskExecutor() {
return blockingTaskExecutor;
}

Expand Down Expand Up @@ -689,7 +678,7 @@ static String toString(
long http2MaxHeaderListSize, long http1MaxInitialLineLength, long http1MaxHeaderSize,
long http1MaxChunkSize, int proxyProtocolMaxTlvSize,
Duration gracefulShutdownQuietPeriod, Duration gracefulShutdownTimeout,
Executor blockingTaskExecutor, boolean shutdownBlockingTaskExecutorOnStop,
ScheduledExecutorService blockingTaskExecutor, boolean shutdownBlockingTaskExecutorOnStop,
@Nullable MeterRegistry meterRegistry, String serviceLoggerPrefix,
Map<ChannelOption<?>, ?> channelOptions, Map<ChannelOption<?>, ?> childChannelOptions,
List<ClientAddressSource> clientAddressSources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.time.Duration;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -211,14 +211,14 @@ default String pathParam(String name) {
HttpService service();

/**
* Returns the {@link ExecutorService} that could be used for executing a potentially long-running task.
* The {@link ExecutorService} will propagate the {@link ServiceRequestContext} automatically when running
* a task.
* Returns the {@link ScheduledExecutorService} that could be used for executing a potentially
* long-running task. The {@link ScheduledExecutorService} will propagate the {@link ServiceRequestContext}
* automatically when running a task.
*
* <p>Note that performing a long-running task in {@link Service#serve(ServiceRequestContext, Request)}
* may block the {@link Server}'s I/O event loop and thus should be executed in other threads.
*/
ExecutorService blockingTaskExecutor();
ScheduledExecutorService blockingTaskExecutor();

/**
* Returns the {@link #path()} with its context path removed. This method can be useful for a reusable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.time.Duration;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -112,7 +112,7 @@ public HttpService service() {
}

@Override
public ExecutorService blockingTaskExecutor() {
public ScheduledExecutorService blockingTaskExecutor() {
return delegate().blockingTaskExecutor();
}

Expand Down
Loading

0 comments on commit 964b7a5

Please sign in to comment.