Skip to content

Commit

Permalink
all: remove 2-arg ClientStreamListener.closed()
Browse files Browse the repository at this point in the history
We used to have two ClientStreamListener.closed() methods. One is simply calling the other with default arg. This doubles debugging (e.g. #7921) and sometimes unit testing work. Deleting the 2-arg method to cleanup.

This PR is purely refactoring.
  • Loading branch information
dapengzhang0 authored Jun 29, 2021
1 parent f93cfe5 commit 4f09073
Show file tree
Hide file tree
Showing 20 changed files with 161 additions and 181 deletions.
10 changes: 6 additions & 4 deletions core/src/main/java/io/grpc/inprocess/InProcessTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.grpc.Status;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.GrpcUtil;
Expand Down Expand Up @@ -240,7 +241,7 @@ private ClientStream failedClientStream(
public void start(ClientStreamListener listener) {
statsTraceCtx.clientOutboundHeaders();
statsTraceCtx.streamClosed(status);
listener.closed(status, new Metadata());
listener.closed(status, RpcProgress.PROCESSED, new Metadata());
}
};
}
Expand Down Expand Up @@ -469,7 +470,8 @@ private synchronized boolean clientRequested(int numMessages) {
closed = true;
clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
clientStreamListener.closed(clientNotifyStatus, clientNotifyTrailers);
clientStreamListener.closed(
clientNotifyStatus, RpcProgress.PROCESSED, clientNotifyTrailers);
}
boolean nowReady = clientRequested > 0;
return !previouslyReady && nowReady;
Expand Down Expand Up @@ -579,7 +581,7 @@ private void notifyClientClose(Status status, Metadata trailers) {
closed = true;
clientStream.statsTraceCtx.clientInboundTrailers(trailers);
clientStream.statsTraceCtx.streamClosed(clientStatus);
clientStreamListener.closed(clientStatus, trailers);
clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers);
} else {
clientNotifyStatus = clientStatus;
clientNotifyTrailers = trailers;
Expand Down Expand Up @@ -615,7 +617,7 @@ private synchronized boolean internalCancel(Status clientStatus) {
}
}
clientStream.statsTraceCtx.streamClosed(clientStatus);
clientStreamListener.closed(clientStatus, new Metadata());
clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata());
return true;
}

Expand Down
5 changes: 0 additions & 5 deletions core/src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -671,11 +671,6 @@ private void runInternal() {
}
}

@Override
public void closed(Status status, Metadata trailers) {
closed(status, RpcProgress.PROCESSED, trailers);
}

@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
PerfMark.startTask("ClientStreamListener.closed", tag);
Expand Down
17 changes: 1 addition & 16 deletions core/src/main/java/io/grpc/internal/ClientStreamListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,14 @@ public interface ClientStreamListener extends StreamListener {
* Called upon receiving all header information from the remote end-point. Note that transports
* are not required to call this method if no header information is received, this would occur
* when a stream immediately terminates with an error and only
* {@link #closed(io.grpc.Status, Metadata)} is called.
* {@link #closed(io.grpc.Status, RpcProgress, Metadata)} is called.
*
* <p>This method should return quickly, as the same thread may be used to process other streams.
*
* @param headers the fully buffered received headers.
*/
void headersRead(Metadata headers);

/**
* Called when the stream is fully closed. {@link
* io.grpc.Status.Code#OK} is the only status code that is guaranteed
* to have been sent from the remote server. Any other status code may have been caused by
* abnormal stream termination. This is guaranteed to always be the final call on a listener. No
* further callbacks will be issued.
*
* <p>This method should return quickly, as the same thread may be used to process other streams.
*
* @param status details about the remote closure
* @param trailers trailing metadata
*/
// TODO(zdapeng): remove this method in favor of the 3-arg one.
void closed(Status status, Metadata trailers);

/**
* Called when the stream is fully closed. {@link
* io.grpc.Status.Code#OK} is the only status code that is guaranteed
Expand Down
15 changes: 3 additions & 12 deletions core/src/main/java/io/grpc/internal/DelayedStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -235,7 +236,7 @@ public void start(ClientStreamListener listener) {
startTimeNanos = System.nanoTime();
}
if (savedError != null) {
listener.closed(savedError, new Metadata());
listener.closed(savedError, RpcProgress.PROCESSED, new Metadata());
return;
}

Expand Down Expand Up @@ -324,7 +325,7 @@ public void run() {
} else {
drainPendingCalls();
// Note that listener is a DelayedStreamListener
listener.closed(reason, new Metadata());
listener.closed(reason, RpcProgress.PROCESSED, new Metadata());
}
}

Expand Down Expand Up @@ -495,16 +496,6 @@ public void run() {
});
}

@Override
public void closed(final Status status, final Metadata trailers) {
delayOrExecute(new Runnable() {
@Override
public void run() {
realListener.closed(status, trailers);
}
});
}

@Override
public void closed(
final Status status, final RpcProgress rpcProgress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ public void headersRead(Metadata headers) {
delegate().headersRead(headers);
}

@Override
public void closed(Status status, Metadata trailers) {
delegate().closed(status, trailers);
}

@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
delegate().closed(status, rpcProgress, trailers);
Expand Down
6 changes: 0 additions & 6 deletions core/src/main/java/io/grpc/internal/InternalSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -684,12 +684,6 @@ protected ClientStreamListener delegate() {
return listener;
}

@Override
public void closed(Status status, Metadata trailers) {
callTracer.reportCallEnded(status.isOk());
super.closed(status, trailers);
}

@Override
public void closed(
Status status, RpcProgress rpcProgress, Metadata trailers) {
Expand Down
12 changes: 4 additions & 8 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -444,7 +445,7 @@ public final void cancel(Status reason) {
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
masterListener.closed(reason, new Metadata());
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());
runnable.run();
return;
}
Expand Down Expand Up @@ -762,11 +763,6 @@ public void headersRead(Metadata headers) {
}
}

@Override
public void closed(Status status, Metadata trailers) {
closed(status, RpcProgress.PROCESSED, trailers);
}

@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
synchronized (lock) {
Expand All @@ -779,7 +775,7 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
if (substream.bufferLimitExceeded) {
commitAndRun(substream);
if (state.winningSubstream == substream) {
masterListener.closed(status, trailers);
masterListener.closed(status, rpcProgress, trailers);
}
return;
}
Expand Down Expand Up @@ -884,7 +880,7 @@ public void run() {

commitAndRun(substream);
if (state.winningSubstream == substream) {
masterListener.closed(status, trailers);
masterListener.closed(status, rpcProgress, trailers);
}
}

Expand Down
16 changes: 0 additions & 16 deletions core/src/test/java/io/grpc/internal/AbstractTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1157,12 +1157,6 @@ public void earlyServerClose_serverFailure_withClientCancelOnListenerClosed() th
final ClientStream clientStream =
client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase() {
@Override
public void closed(Status status, Metadata trailers) {
super.closed(status, trailers);
// This simulates the blocking calls which can trigger clientStream.cancel().
clientStream.cancel(Status.CANCELLED.withCause(status.asRuntimeException()));
}

@Override
public void closed(
Expand Down Expand Up @@ -1245,11 +1239,6 @@ public void clientCancelFromWithinMessageRead() throws Exception {
public void headersRead(Metadata headers) {
}

@Override
public void closed(Status status, Metadata trailers) {
closed(status, RpcProgress.PROCESSED, trailers);
}

@Override
public void closed(
Status status, RpcProgress rpcProgress, Metadata trailers) {
Expand Down Expand Up @@ -2289,11 +2278,6 @@ public void headersRead(Metadata headers) {
this.headers.set(headers);
}

@Override
public void closed(Status status, Metadata trailers) {
closed(status, RpcProgress.PROCESSED, trailers);
}

@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
if (this.status.isDone()) {
Expand Down
13 changes: 7 additions & 6 deletions core/src/test/java/io/grpc/internal/ClientCallImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.internal;

import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -173,7 +174,7 @@ public void statusPropagatedFromStreamToCallListener() {
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
streamListener.headersRead(new Metadata());
Status status = Status.RESOURCE_EXHAUSTED.withDescription("simulated");
streamListener.closed(status , new Metadata());
streamListener.closed(status , PROCESSED, new Metadata());
executor.release();

verify(callListener).onClose(same(status), ArgumentMatchers.isA(Metadata.class));
Expand Down Expand Up @@ -205,7 +206,7 @@ public void exceptionInOnMessageTakesPrecedenceOverServer() {
*/
streamListener
.messagesAvailable(new SingleMessageProducer(new ByteArrayInputStream(new byte[]{})));
streamListener.closed(Status.OK, new Metadata());
streamListener.closed(Status.OK, PROCESSED, new Metadata());
executor.release();

verify(callListener).onClose(statusArgumentCaptor.capture(),
Expand Down Expand Up @@ -240,7 +241,7 @@ public void exceptionInOnHeadersTakesPrecedenceOverServer() {
* the call being counted as successful.
*/
streamListener.headersRead(new Metadata());
streamListener.closed(Status.OK, new Metadata());
streamListener.closed(Status.OK, PROCESSED, new Metadata());
executor.release();

verify(callListener).onClose(statusArgumentCaptor.capture(),
Expand Down Expand Up @@ -292,7 +293,7 @@ class PointOfNoReturnExecutor implements Executor {
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();

streamListener.headersRead(new Metadata());
streamListener.closed(Status.OK, new Metadata());
streamListener.closed(Status.OK, PROCESSED, new Metadata());
}

@Test
Expand All @@ -319,7 +320,7 @@ public void exceptionInOnReadyTakesPrecedenceOverServer() {
* the call being counted as successful.
*/
streamListener.onReady();
streamListener.closed(Status.OK, new Metadata());
streamListener.closed(Status.OK, PROCESSED, new Metadata());
executor.release();

verify(callListener).onClose(statusArgumentCaptor.capture(),
Expand Down Expand Up @@ -664,7 +665,7 @@ private void checkContext() {
listener.headersRead(new Metadata());
listener.messagesAvailable(new SingleMessageProducer(new ByteArrayInputStream(new byte[0])));
listener.messagesAvailable(new SingleMessageProducer(new ByteArrayInputStream(new byte[0])));
listener.closed(Status.OK, new Metadata());
listener.closed(Status.OK, PROCESSED, new Metadata());

assertTrue(latch.await(5, TimeUnit.SECONDS));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ public void uncaughtException(Thread t, Throwable e) {
assertEquals(1, delayedTransport.getPendingStreamsCount());
stream.cancel(Status.CANCELLED);
assertEquals(0, delayedTransport.getPendingStreamsCount());
verify(streamListener).closed(same(Status.CANCELLED), any(Metadata.class));
verify(streamListener).closed(
same(Status.CANCELLED), same(RpcProgress.PROCESSED), any(Metadata.class));
verifyNoMoreInteractions(mockRealTransport);
verifyNoMoreInteractions(mockRealStream);
}
Expand All @@ -233,7 +234,8 @@ public void uncaughtException(Thread t, Throwable e) {

// Further newStream() will return a failing stream
stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
verify(streamListener, never()).closed(any(Status.class), any(Metadata.class));
verify(streamListener, never()).closed(
any(Status.class), any(RpcProgress.class), any(Metadata.class));
stream.start(streamListener);
verify(streamListener).closed(
statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
Expand Down
11 changes: 6 additions & 5 deletions core/src/test/java/io/grpc/internal/DelayedStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.testing.SingleMessageProducer;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
Expand Down Expand Up @@ -312,7 +313,7 @@ public void setStream_getAttributes() {
public void startThenCancelled() {
stream.start(listener);
stream.cancel(Status.CANCELLED);
verify(listener).closed(eq(Status.CANCELLED), any(Metadata.class));
verify(listener).closed(eq(Status.CANCELLED), any(RpcProgress.class), any(Metadata.class));
}

@Test
Expand Down Expand Up @@ -408,7 +409,7 @@ public void start(ClientStreamListener passedListener) {
passedListener.messagesAvailable(producer1);
passedListener.onReady();
passedListener.messagesAvailable(producer2);
passedListener.closed(status, trailers);
passedListener.closed(status, RpcProgress.PROCESSED, trailers);

verifyNoMoreInteractions(listener);
}
Expand All @@ -418,7 +419,7 @@ public void start(ClientStreamListener passedListener) {
inOrder.verify(listener).messagesAvailable(producer1);
inOrder.verify(listener).onReady();
inOrder.verify(listener).messagesAvailable(producer2);
inOrder.verify(listener).closed(status, trailers);
inOrder.verify(listener).closed(status, RpcProgress.PROCESSED, trailers);
}

@Test
Expand All @@ -439,8 +440,8 @@ public void listener_noQueued() {
verify(listener).headersRead(headers);
delayedListener.messagesAvailable(producer);
verify(listener).messagesAvailable(producer);
delayedListener.closed(status, trailers);
verify(listener).closed(status, trailers);
delayedListener.closed(status, RpcProgress.PROCESSED, trailers);
verify(listener).closed(status, RpcProgress.PROCESSED, trailers);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.grpc.ForwardingTestUtil;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.StreamListener.MessageProducer;
import java.lang.reflect.Method;
import java.util.Collections;
Expand Down Expand Up @@ -61,8 +62,8 @@ public void headersReadTest() {
public void closedTest() {
Status status = Status.UNKNOWN;
Metadata trailers = new Metadata();
forward.closed(status, trailers);
verify(mock).closed(same(status), same(trailers));
forward.closed(status, RpcProgress.PROCESSED, trailers);
verify(mock).closed(same(status), same(RpcProgress.PROCESSED), same(trailers));
}

@Test
Expand Down
Loading

0 comments on commit 4f09073

Please sign in to comment.