Skip to content

Commit

Permalink
stub: ClientCalls.ThreadlessExecutor throws rejectedExecutorException…
Browse files Browse the repository at this point in the history
… disable by default (#8973)

* stub: Have ClientCalls.ThreadlessExecutor reject Runnables after end of RPC

Changes originally proposed as part of #7106.

Fixes #3557

* add environment variable rejectExecutedException

Co-authored-by: Nick Hill <nickhill@us.ibm.com>
  • Loading branch information
YifeiZhuang and njhill authored Mar 8, 2022
1 parent 1d4eb49 commit 299851d
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 7 deletions.
49 changes: 42 additions & 7 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CallOptions;
Expand All @@ -39,6 +41,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -53,6 +56,11 @@ public final class ClientCalls {

private static final Logger logger = Logger.getLogger(ClientCalls.class.getName());

@VisibleForTesting
static boolean rejectRunnableOnExecutor =
!Strings.isNullOrEmpty(System.getenv("GRPC_CLIENT_CALL_REJECT_RUNNABLE"))
&& Boolean.parseBoolean(System.getenv("GRPC_CLIENT_CALL_REJECT_RUNNABLE"));

// Prevent instantiation
private ClientCalls() {}

Expand Down Expand Up @@ -161,6 +169,7 @@ public static <ReqT, RespT> RespT blockingUnaryCall(
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} finally {
executor.shutdown();
if (interrupt) {
Thread.currentThread().interrupt();
}
Expand Down Expand Up @@ -626,6 +635,9 @@ private Object waitForNext() {
// Now wait for onClose() to be called, so interceptors can clean up
}
}
if (next == this || next instanceof StatusRuntimeException) {
threadless.shutdown();
}
return next;
}
} finally {
Expand Down Expand Up @@ -712,7 +724,10 @@ private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runn
implements Executor {
private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());

private volatile Thread waiter;
private static final Object SHUTDOWN = new Object(); // sentinel

// Set to the calling thread while it's parked, SHUTDOWN on RPC completion
private volatile Object waiter;

// Non private to avoid synthetic class
ThreadlessExecutor() {}
Expand All @@ -736,14 +751,29 @@ public void waitAndDrain() throws InterruptedException {
}
}
do {
try {
runnable.run();
} catch (Throwable t) {
log.log(Level.WARNING, "Runnable threw exception", t);
}
runQuietly(runnable);
} while ((runnable = poll()) != null);
}

/**
* Called after final call to {@link #waitAndDrain()}, from same thread.
*/
public void shutdown() {
waiter = SHUTDOWN;
Runnable runnable;
while ((runnable = poll()) != null) {
runQuietly(runnable);
}
}

private static void runQuietly(Runnable runnable) {
try {
runnable.run();
} catch (Throwable t) {
log.log(Level.WARNING, "Runnable threw exception", t);
}
}

private static void throwIfInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
Expand All @@ -753,7 +783,12 @@ private static void throwIfInterrupted() throws InterruptedException {
@Override
public void execute(Runnable runnable) {
add(runnable);
LockSupport.unpark(waiter); // no-op if null
Object waiter = this.waiter;
if (waiter != SHUTDOWN) {
LockSupport.unpark((Thread) waiter); // no-op if null
} else if (remove(runnable) && rejectRunnableOnExecutor) {
throw new RejectedExecutionException();
}
}
}

Expand Down
51 changes: 51 additions & 0 deletions stub/src/test/java/io/grpc/stub/ClientCallsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -58,6 +59,8 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -97,10 +100,12 @@ public class ClientCallsTest {
private ArgumentCaptor<MethodDescriptor<?, ?>> methodDescriptorCaptor;
@Captor
private ArgumentCaptor<CallOptions> callOptionsCaptor;
private boolean originalRejectRunnableOnExecutor;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
originalRejectRunnableOnExecutor = ClientCalls.rejectRunnableOnExecutor;
}

@After
Expand All @@ -111,6 +116,7 @@ public void tearDown() {
if (channel != null) {
channel.shutdownNow();
}
ClientCalls.rejectRunnableOnExecutor = originalRejectRunnableOnExecutor;
}

@Test
Expand Down Expand Up @@ -217,6 +223,49 @@ class NoopUnaryMethod implements UnaryMethod<Integer, Integer> {
assertTrue("context not cancelled", methodImpl.observer.isCancelled());
}

@Test
public void blockingUnaryCall2_rejectExecutionOnClose() throws Exception {
Integer req = 2;

class NoopUnaryMethod implements UnaryMethod<Integer, Integer> {
ServerCallStreamObserver<Integer> observer;

@Override public void invoke(Integer request, StreamObserver<Integer> responseObserver) {
observer = (ServerCallStreamObserver<Integer>) responseObserver;
}
}

NoopUnaryMethod methodImpl = new NoopUnaryMethod();
server = InProcessServerBuilder.forName("noop").directExecutor()
.addService(ServerServiceDefinition.builder("some")
.addMethod(UNARY_METHOD, ServerCalls.asyncUnaryCall(methodImpl))
.build())
.build().start();

InterruptInterceptor interceptor = new InterruptInterceptor();
channel = InProcessChannelBuilder.forName("noop")
.directExecutor()
.intercept(interceptor)
.build();
try {
ClientCalls.blockingUnaryCall(channel, UNARY_METHOD, CallOptions.DEFAULT, req);
fail();
} catch (StatusRuntimeException ex) {
assertTrue(Thread.interrupted());
assertTrue("interrupted", ex.getCause() instanceof InterruptedException);
}
assertTrue("onCloseCalled", interceptor.onCloseCalled);
assertTrue("context not cancelled", methodImpl.observer.isCancelled());
assertNotNull("callOptionsExecutor should not be null", interceptor.savedExecutor);
ClientCalls.rejectRunnableOnExecutor = true;
try {
interceptor.savedExecutor.execute(() -> { });
fail();
} catch (Exception ex) {
assertTrue(ex instanceof RejectedExecutionException);
}
}

@Test
public void blockingUnaryCall_HasBlockingStubType() {
NoopClientCall<Integer, Integer> call = new NoopClientCall<Integer, Integer>() {
Expand Down Expand Up @@ -858,6 +907,7 @@ class NoopServerStreamingMethod implements ServerStreamingMethod<Integer, Intege
// Used for blocking tests to check interrupt behavior and make sure onClose is still called.
class InterruptInterceptor implements ClientInterceptor {
boolean onCloseCalled;
Executor savedExecutor;

@Override
public <ReqT,RespT> ClientCall<ReqT, RespT> interceptCall(
Expand All @@ -867,6 +917,7 @@ public <ReqT,RespT> ClientCall<ReqT, RespT> interceptCall(
super.start(new SimpleForwardingClientCallListener<RespT>(listener) {
@Override public void onClose(Status status, Metadata trailers) {
onCloseCalled = true;
savedExecutor = callOptions.getExecutor();
super.onClose(status, trailers);
}
}, headers);
Expand Down

0 comments on commit 299851d

Please sign in to comment.