Skip to content

Commit

Permalink
Merge branch 'master' into java-configurable-ready-threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
jduo committed Mar 21, 2024
2 parents 7cf43d2 + 68eb639 commit 63a454d
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 80 deletions.
167 changes: 92 additions & 75 deletions core/src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
import static java.lang.Math.max;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -61,7 +62,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -82,13 +82,16 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private final boolean callExecutorIsDirect;
private final CallTracer channelCallsTracer;
private final Context context;
private CancellationHandler cancellationHandler;
private volatile ScheduledFuture<?> deadlineCancellationFuture;
private final boolean unaryRequest;
private CallOptions callOptions;
private ClientStream stream;
private volatile boolean cancelListenersShouldBeRemoved;
private boolean cancelCalled;
private boolean halfCloseCalled;
private final ClientStreamProvider clientStreamProvider;
private final ContextCancellationListener cancellationListener =
new ContextCancellationListener();
private final ScheduledExecutorService deadlineCancellationExecutor;
private boolean fullStreamDecompression;
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
Expand Down Expand Up @@ -125,6 +128,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
PerfMark.event("ClientCall.<init>", tag);
}

private final class ContextCancellationListener implements CancellationListener {
@Override
public void cancelled(Context context) {
stream.cancel(statusFromCancelled(context));
}
}

/**
* Provider of {@link ClientStream}s.
*/
Expand Down Expand Up @@ -242,21 +252,21 @@ public void runInContext() {
prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);

Deadline effectiveDeadline = effectiveDeadline();
boolean contextIsDeadlineSource = effectiveDeadline != null
&& effectiveDeadline.equals(context.getDeadline());
cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource);
boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0;
boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
if (!deadlineExceeded) {
logIfContextNarrowedTimeout(
effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
} else {
ClientStreamTracer[] tracers =
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
String deadlineName =
isFirstMin(callOptions.getDeadline(), context.getDeadline()) ? "CallOptions" : "Context";
Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
String description = String.format(
"ClientCall started after %s deadline was exceeded %.9f seconds ago. "
+ "Name resolution delay %.9f seconds.", deadlineName,
cancellationHandler.remainingNanos / NANO_TO_SECS,
effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS) / NANO_TO_SECS,
nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
}
Expand Down Expand Up @@ -288,7 +298,21 @@ public void runInContext() {
// they receive cancel before start. Issue #1343 has more details

// Propagate later Context cancellation to the remote side.
cancellationHandler.setUp();
context.addListener(cancellationListener, directExecutor());
if (effectiveDeadline != null
// If the context has the effective deadline, we don't need to schedule an extra task.
&& !effectiveDeadline.equals(context.getDeadline())
// If the channel has been terminated, we don't need to schedule an extra task.
&& deadlineCancellationExecutor != null) {
deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
}
if (cancelListenersShouldBeRemoved) {
// Race detected! ClientStreamListener.closed may have been called before
// deadlineCancellationFuture was set / context listener added, thereby preventing the future
// and listener from being cancelled. Go ahead and cancel again, just to be sure it
// was cancelled.
removeContextListenerAndCancelDeadlineFuture();
}
}

private void applyMethodConfig() {
Expand Down Expand Up @@ -330,96 +354,75 @@ private void applyMethodConfig() {
}
}

private final class CancellationHandler implements Runnable, CancellationListener {
private final boolean contextIsDeadlineSource;
private final boolean hasDeadline;
private final long remainingNanos;
private volatile ScheduledFuture<?> deadlineCancellationFuture;
private volatile boolean tearDownCalled;

CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) {
this.contextIsDeadlineSource = contextIsDeadlineSource;
if (deadline == null) {
hasDeadline = false;
remainingNanos = 0;
} else {
hasDeadline = true;
remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
}
private static void logIfContextNarrowedTimeout(
Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline,
@Nullable Deadline callDeadline) {
if (!log.isLoggable(Level.FINE) || effectiveDeadline == null
|| !effectiveDeadline.equals(outerCallDeadline)) {
return;
}

void setUp() {
if (tearDownCalled) {
return;
}
if (hasDeadline
// If the context has the effective deadline, we don't need to schedule an extra task.
&& !contextIsDeadlineSource
// If the channel has been terminated, we don't need to schedule an extra task.
&& deadlineCancellationExecutor != null) {
deadlineCancellationFuture = deadlineCancellationExecutor.schedule(
new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS);
}
context.addListener(this, directExecutor());
if (tearDownCalled) {
// Race detected! Re-run to make sure the future is cancelled and context listener removed
tearDown();
}
long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS));
StringBuilder builder = new StringBuilder(String.format(
Locale.US,
"Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
if (callDeadline == null) {
builder.append(" Explicit call timeout was not set.");
} else {
long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
}

// May be called multiple times, and race with setUp()
void tearDown() {
tearDownCalled = true;
ScheduledFuture<?> deadlineCancellationFuture = this.deadlineCancellationFuture;
if (deadlineCancellationFuture != null) {
deadlineCancellationFuture.cancel(false);
}
context.removeListener(this);
log.fine(builder.toString());
}

private void removeContextListenerAndCancelDeadlineFuture() {
context.removeListener(cancellationListener);
ScheduledFuture<?> f = deadlineCancellationFuture;
if (f != null) {
f.cancel(false);
}
}

@Override
public void cancelled(Context context) {
if (hasDeadline && contextIsDeadlineSource
&& context.cancellationCause() instanceof TimeoutException) {
stream.cancel(formatDeadlineExceededStatus());
return;
}
stream.cancel(statusFromCancelled(context));
private class DeadlineTimer implements Runnable {
private final long remainingNanos;

DeadlineTimer(long remainingNanos) {
this.remainingNanos = remainingNanos;
}

@Override
public void run() {
stream.cancel(formatDeadlineExceededStatus());
}

Status formatDeadlineExceededStatus() {
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
// DelayedStream.cancel() is safe to call from a thread that is different from where the
// stream is created.
long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);

StringBuilder buf = new StringBuilder();
buf.append(contextIsDeadlineSource ? "Context" : "CallOptions");
buf.append(" deadline exceeded after ");
buf.append("deadline exceeded after ");
if (remainingNanos < 0) {
buf.append('-');
}
buf.append(seconds);
buf.append(String.format(Locale.US, ".%09d", nanos));
buf.append("s. ");
Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.",
buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds. ",
nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
if (stream != null) {
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
buf.append(" ");
buf.append(insight);
}
return DEADLINE_EXCEEDED.withDescription(buf.toString());
buf.append(insight);
stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString()));
}
}

private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
return deadlineCancellationExecutor.schedule(
new LogExceptionRunnable(
new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
}

@Nullable
private Deadline effectiveDeadline() {
// Call options and context are immutable, so we don't need to cache the deadline.
Expand All @@ -437,6 +440,16 @@ private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline dea
return deadline0.minimum(deadline1);
}

private static boolean isFirstMin(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
if (deadline0 == null) {
return false;
}
if (deadline1 == null) {
return true;
}
return deadline0.isBefore(deadline1);
}

@Override
public void request(int numMessages) {
try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
Expand Down Expand Up @@ -480,7 +493,7 @@ private void cancelInternal(@Nullable String message, @Nullable Throwable cause)
stream.cancel(status);
}
} finally {
cancellationHandler.tearDown();
removeContextListenerAndCancelDeadlineFuture();
}
}

Expand Down Expand Up @@ -686,7 +699,10 @@ private void closedInternal(
// description. Since our timer may be delayed in firing, we double-check the deadline and
// turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
if (deadline.isExpired()) {
status = cancellationHandler.formatDeadlineExceededStatus();
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
status = DEADLINE_EXCEEDED.augmentDescription(
"ClientCall was cancelled at or after deadline. " + insight);
// Replace trailers to prevent mixing sources of status and trailers.
trailers = new Metadata();
}
Expand All @@ -709,7 +725,6 @@ public void runInContext() {
}

private void runInternal() {
cancellationHandler.tearDown();
Status status = savedStatus;
Metadata trailers = savedTrailers;
if (exceptionStatus != null) {
Expand All @@ -722,9 +737,11 @@ private void runInternal() {
// Replace trailers to prevent mixing sources of status and trailers.
trailers = new Metadata();
}
cancelListenersShouldBeRemoved = true;
try {
closeObserver(observer, status, trailers);
} finally {
removeContextListenerAndCancelDeadlineFuture();
channelCallsTracer.reportCallEnded(status.isOk());
}
}
Expand Down
6 changes: 2 additions & 4 deletions core/src/test/java/io/grpc/internal/ClientCallImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ public void expiredDeadlineCancelsStream_CallOptions() {
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription())
.matches("CallOptions deadline exceeded after [0-9]+\\.[0-9]+s. "
.matches("deadline exceeded after [0-9]+\\.[0-9]+s. "
+ "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
}

Expand Down Expand Up @@ -954,9 +954,7 @@ public void expiredDeadlineCancelsStream_Context() {

verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription())
.matches("Context deadline exceeded after [0-9]+\\.[0-9]+s. "
+ "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ public void deadlineExceeded() throws Exception {
assertTrue(desc,
// There is a race between client and server-side deadline expiration.
// If client expires first, it'd generate this message
Pattern.matches("CallOptions deadline exceeded after .*s. \\[.*\\]", desc)
Pattern.matches("deadline exceeded after .*s. \\[.*\\]", desc)
// If server expires first, it'd reset the stream and client would generate a different
// message
|| desc.startsWith("ClientCall was cancelled at or after deadline."));
Expand Down

0 comments on commit 63a454d

Please sign in to comment.