Skip to content

Commit

Permalink
Deduplicate timeout code in RequestContext implementations (#2567)
Browse files Browse the repository at this point in the history
Motivation:
`DefaultClientRequestContext` and `DefaultServiceRequestContext` have duplicate code.
It is easy to make mistakes.

Modifications:
* Add `TimeoutScheduler` and migrate duplicate code
* Remove `timedOut` flag in `DefaultServiceRequestContext` and delegate to `RequestTimeoutScheduler`
* Increase `globalTimeout` in `BraveClientIntegrationTest` (still flaky)

Results:
Cleaner code
ikhoon authored Mar 10, 2020
1 parent f0afb37 commit 0efa85c
Showing 7 changed files with 255 additions and 296 deletions.
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@
public class BraveClientIntegrationTest extends ITHttpAsyncClient<WebClient> {

@Rule(order = Integer.MAX_VALUE)
public TestRule globalTimeout = new DisableOnDebug(Timeout.seconds(10));
public TestRule globalTimeout = new DisableOnDebug(Timeout.seconds(15));

@Parameters
public static List<SessionProtocol> sessionProtocols() {
Original file line number Diff line number Diff line change
@@ -24,15 +24,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;

import javax.annotation.Nullable;
import javax.net.ssl.SSLSession;

import com.google.common.math.LongMath;

import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.HttpHeaders;
@@ -54,6 +51,7 @@
import com.linecorp.armeria.common.util.ReleasableHolder;
import com.linecorp.armeria.common.util.UnstableApi;
import com.linecorp.armeria.internal.common.TimeoutController;
import com.linecorp.armeria.internal.common.TimeoutScheduler;
import com.linecorp.armeria.server.ServiceRequestContext;

import io.micrometer.core.instrument.MeterRegistry;
@@ -89,15 +87,11 @@ public final class DefaultClientRequestContext
private final ServiceRequestContext root;

private final RequestLogBuilder log;
private final TimeoutScheduler timeoutScheduler;

private long writeTimeoutMillis;
private long responseTimeoutMillis;
@Nullable
private Runnable responseTimeoutHandler;
@Nullable
private TimeoutController responseTimeoutController;
@Nullable
private Consumer<TimeoutController> pendingTimeoutTask;
private long maxResponseLength;

@SuppressWarnings("FieldMayBeFinal") // Updated via `additionalRequestHeadersUpdater`
@@ -174,9 +168,9 @@ private DefaultClientRequestContext(

log = RequestLog.builder(this);
log.startRequest(requestStartTimeNanos, requestStartTimeMicros);
timeoutScheduler = new TimeoutScheduler(options.responseTimeoutMillis());

writeTimeoutMillis = options.writeTimeoutMillis();
responseTimeoutMillis = options.responseTimeoutMillis();
maxResponseLength = options.maxResponseLength();
additionalRequestHeaders = options.get(ClientOption.HTTP_HEADERS);
customizers = copyThreadLocalCustomizers();
@@ -310,9 +304,9 @@ private DefaultClientRequestContext(DefaultClientRequestContext ctx,
root = ctx.root();

log = RequestLog.builder(this);
timeoutScheduler = new TimeoutScheduler(ctx.responseTimeoutMillis());

writeTimeoutMillis = ctx.writeTimeoutMillis();
responseTimeoutMillis = ctx.responseTimeoutMillis();
maxResponseLength = ctx.maxResponseLength();
additionalRequestHeaders = ctx.additionalRequestHeaders();

@@ -428,45 +422,17 @@ public void setWriteTimeout(Duration writeTimeout) {

@Override
public long responseTimeoutMillis() {
return responseTimeoutMillis;
return timeoutScheduler.timeoutMillis();
}

@Override
public void clearResponseTimeout() {
if (responseTimeoutMillis == 0) {
return;
}

final TimeoutController responseTimeoutController = this.responseTimeoutController;
responseTimeoutMillis = 0;
if (responseTimeoutController != null) {
if (eventLoop().inEventLoop()) {
responseTimeoutController.cancelTimeout();
} else {
eventLoop().execute(responseTimeoutController::cancelTimeout);
}
} else {
addPendingTimeoutTask(TimeoutController::cancelTimeout);
}
timeoutScheduler.clearTimeout();
}

@Override
public void setResponseTimeoutMillis(long responseTimeoutMillis) {
checkArgument(responseTimeoutMillis >= 0,
"responseTimeoutMillis: %s (expected: >= 0)", responseTimeoutMillis);
if (responseTimeoutMillis == 0) {
clearResponseTimeout();
return;
}

if (this.responseTimeoutMillis == 0) {
setResponseTimeoutAfterMillis(responseTimeoutMillis);
return;
}

final long adjustmentMillis =
LongMath.saturatedSubtract(responseTimeoutMillis, this.responseTimeoutMillis);
extendResponseTimeoutMillis(adjustmentMillis);
timeoutScheduler.setTimeoutMillis(responseTimeoutMillis);
}

@Override
@@ -476,22 +442,7 @@ public void setResponseTimeout(Duration responseTimeout) {

@Override
public void extendResponseTimeoutMillis(long adjustmentMillis) {
if (adjustmentMillis == 0 || responseTimeoutMillis == 0) {
return;
}

final long oldResponseTimeoutMillis = responseTimeoutMillis;
responseTimeoutMillis = LongMath.saturatedAdd(oldResponseTimeoutMillis, adjustmentMillis);
final TimeoutController responseTimeoutController = this.responseTimeoutController;
if (responseTimeoutController != null) {
if (eventLoop().inEventLoop()) {
responseTimeoutController.extendTimeout(adjustmentMillis);
} else {
eventLoop().execute(() -> responseTimeoutController.extendTimeout(adjustmentMillis));
}
} else {
addPendingTimeoutTask(timeoutController -> timeoutController.extendTimeout(adjustmentMillis));
}
timeoutScheduler.extendTimeoutMillis(adjustmentMillis);
}

@Override
@@ -501,32 +452,7 @@ public void extendResponseTimeout(Duration adjustment) {

@Override
public void setResponseTimeoutAfterMillis(long responseTimeoutMillis) {
checkArgument(responseTimeoutMillis > 0,
"responseTimeoutMillis: %s (expected: > 0)", responseTimeoutMillis);

long passedTimeMillis = 0;
final TimeoutController responseTimeoutController = this.responseTimeoutController;
if (responseTimeoutController != null) {
final Long startTimeNanos = responseTimeoutController.startTimeNanos();
if (startTimeNanos != null) {
passedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
}
if (eventLoop().inEventLoop()) {
responseTimeoutController.resetTimeout(responseTimeoutMillis);
} else {
eventLoop().execute(() -> responseTimeoutController.resetTimeout(responseTimeoutMillis));
}
} else {
final long startTimeNanos = System.nanoTime();
addPendingTimeoutTask(timeoutController -> {
final long passedTimeMillis0 =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
final long timeoutMillis = Math.max(1, responseTimeoutMillis - passedTimeMillis0);
timeoutController.resetTimeout(timeoutMillis);
});
}

this.responseTimeoutMillis = LongMath.saturatedAdd(passedTimeMillis, responseTimeoutMillis);
timeoutScheduler.setTimeoutAfterMillis(responseTimeoutMillis);
}

@Override
@@ -536,24 +462,7 @@ public void setResponseTimeoutAfter(Duration responseTimeout) {

@Override
public void setResponseTimeoutAtMillis(long responseTimeoutAtMillis) {
checkArgument(responseTimeoutAtMillis >= 0,
"responseTimeoutAtMillis: %s (expected: >= 0)", responseTimeoutMillis);
final long responseTimeoutAfter = responseTimeoutAtMillis - System.currentTimeMillis();

if (responseTimeoutAfter <= 0) {
final TimeoutController responseTimeoutController = this.responseTimeoutController;
if (responseTimeoutController != null) {
if (eventLoop().inEventLoop()) {
responseTimeoutController.timeoutNow();
} else {
eventLoop().execute(responseTimeoutController::timeoutNow);
}
} else {
addPendingTimeoutTask(TimeoutController::timeoutNow);
}
} else {
setResponseTimeoutAfterMillis(responseTimeoutAfter);
}
timeoutScheduler.setTimeoutAtMillis(responseTimeoutAtMillis);
}

@Override
@@ -580,27 +489,7 @@ public void setResponseTimeoutHandler(Runnable responseTimeoutHandler) {
* a timeout task when a user updates the response timeout configuration.
*/
void setResponseTimeoutController(TimeoutController responseTimeoutController) {
requireNonNull(responseTimeoutController, "responseTimeoutController");
checkState(this.responseTimeoutController == null, "responseTimeoutController is set already.");
this.responseTimeoutController = responseTimeoutController;

// Invoke pending timeout task which was set before initializing responseTimeoutController
final Consumer<TimeoutController> pendingTimeoutTask = this.pendingTimeoutTask;
if (pendingTimeoutTask != null) {
if (eventLoop().inEventLoop()) {
pendingTimeoutTask.accept(responseTimeoutController);
} else {
eventLoop().execute(() -> pendingTimeoutTask.accept(responseTimeoutController));
}
}
}

private void addPendingTimeoutTask(Consumer<TimeoutController> pendingTimeoutTask) {
if (this.pendingTimeoutTask == null) {
this.pendingTimeoutTask = pendingTimeoutTask;
} else {
this.pendingTimeoutTask = this.pendingTimeoutTask.andThen(pendingTimeoutTask);
}
timeoutScheduler.setTimeoutController(responseTimeoutController, eventLoop);
}

@Override
Loading
Oops, something went wrong.

0 comments on commit 0efa85c

Please sign in to comment.